Skip to main content

Ingest Kafka records and metadata by API

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024.

You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023. For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID

Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.

When ingesting data from Apache Kafka topics into Imply Polaris, you can ingest event metadata including the key, Kafka timestamp, and any headers in addition to the event value that stores the message itself.

For example, consider the following record produced to a Kafka topic:

{
"partition_id": 0,
"headers": [
{
"name": "Header-1",
"value": "SGVhZGVyLTE="
},
{
"name": "Header-2",
"value": "SGVhZGVyLTI="
}
],
"key": {
"type": "BINARY",
"data": "aWFtZ3Jvb3QK"
},
"value": {
"type": "JSON",
"data": {
"city": "San Francisco"
}
},
"timestamp": "2023-05-18T01:02:34Z"
}

If you specify the source data as JSON format, Polaris only ingests the event value:

{"city": "San Francisco"}

However, if you don't set a default timestamp, Polaris treats the record as unparseable for because the event value doesn't contain a timestamp. You can use the Kafka format to fill in the timestamp for Polaris using the event timestamp as well as ingest the data in the event data.

This topic shows you how to use the Kafka source format to ingest Kafka event data and its corresponding metadata.

Prerequisites

Before continuing with this topic, ensure you have the following:

  • Familiarity with the basics of creating a streaming ingestion job.
  • A connection that describes your Kafka data source.
  • An API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Map the Kafka timestamp to the primary table timestamp

Define the source format as Kafka in source.formatSettings.format of the ingestion job request payload. You must also set source.formatSettings.valueFormat to the data format of the event value. For example:

    "source": {
...
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
}
}
},

To use the Kafka fields in mappings, such as to parse the Kafka timestamp, include them in source.inputSchema. Even though the Kafka timestamp in the example is in ISO 8601 format, Polaris reads the timestamp in milliseconds since epoch format. Therefore, declare the Kafka timestamp in the inputSchema as follows:

  • Set the name property to the same value as timestampColumnName (kafka.timestamp by default).
  • Set the dataType to long.

For example:

"inputSchema": [
{
"name": "kafka.timestamp",
"dataType": "long"
},
]

When mapping the Kafka timestamp to __time, apply the MILLIS_TO_TIMESTAMP operator—that is, MILLIS_TO_TIMESTAMP("kafka.timestamp"). For example:

"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
]

Sample request

The following example request creates an ingestion job using the Kafka format and maps the Kafka timestamp to __time:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T00:50:27.408275186Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "46c633ce-2c17-45b5-bcf9-e8d92ca6008b",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T00:50:27.408275186Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Map Kafka key and headers to new columns

In addition to the Kafka timestamp, events are associated with a key and, optionally, headers in the form of key-value pairs. You can extract this metadata to ingest into your table.

To parse headers, you need all of the following:

  • Set the value of headerFormat in source.formatSettings to the encoding used to parse header values, such as utf-8. For example:
    "formatSettings": {
    "format": "kafka",
    "valueFormat": {
    "format": "nd-json"
    },
    "headerLabelPrefix": "kafka.header.",
    "headerFormat": "utf-8",
    }
  • Declare the header labels in source.inputSchema. For example:
    "inputSchema": [
    {
    "name": "kafka.header.Header-1",
    "dataType": "string"
    },
    {
    "name": "kafka.header.Header-2",
    "dataType": "string"
    }
    ]
  • Map the header input fields to table columns in mappings. For example:
     "mappings": [
    {
    "columnName": "kafka.header.Header-1",
    "expression": "\"kafka.header.Header-1\""
    },
    {
    "columnName": "kafka.header.Header-2",
    "expression": "\"kafka.header.Header-2\""
    }
    ]

You can optionally set the prefix applied to the Kafka headers in source.formatSettings.headerLabelPrefix. Note that the default value of headerLabelPrefix=kafka.header. ends with a period. For instance, for headers with the keys h1 and h2, Polaris prepends the header so that you use kafka.header.h1 and kafka.header.h2 in the input schema and mappings.

The key field associated with a record is typically used for partitioning in Kafka. For Polaris to parse and ingest the key, you need the following:

  • Set the value of keyFormat in source.formatSettings. The keyFormat field takes an object that describes the data format settings used to parse the key. If you store the key as a string, use the CSV input format. You must also set columns for Polaris to parse the key. For example:
    "formatSettings": {
    "format": "kafka",
    "valueFormat": {
    "format": "nd-json"
    },
    "keyColumnName": "kafka.key",
    "keyFormat": {
    "columns": [
    "key"
    ],
    "format": "csv"
    }
    }
  • Declare the key in source.inputSchema. For example:
    "inputSchema": [
    {
    "name": "kafka.key",
    "dataType": "string"
    },
    ]
  • Map the key input field to a table column in mappings. For example:
     "mappings": [
    {
    "columnName": "kafka.key",
    "expression": "\"kafka.key\""
    },
    ]

You can optionally set the name of the Kafka key input field in source.formatSettings.keyColumnName. Use the value of this field when declaring the key in the input schema.

For additional information on specifying the Kafka key format, see the Jobs API documentation.

Sample request

The following example request creates an ingestion job using the Kafka format and creates new columns for the Kafka key and headers:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T16:37:36.193117466Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "e24b4162-7f86-4e35-8cc8-403b9bf6ccd6",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T16:37:36.193117466Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

See the following topics for more information: