Skip to main content

Ingest Kinesis records and metadata by API

When ingesting data from Amazon Kinesis streams into Imply Polaris, you can ingest event metadata, specifically the approximate arrival timestamp and the partition key.

For example, consider the following record produced to a Kinesis stream topic:

{
"SequenceNumber": "49545115243490985018280067714973144582180062593244200961",
"PartitionKey": "partitionKey-1",
"Data": "SGVsbG8sIFdvcmxkIQ==", // Base64-encoded string
"ApproximateArrivalTimestamp": 1609459200.0
}

If you don't set a default timestamp, Polaris treats the record as unparseable because the event value doesn't contain a timestamp. You can use the Kinesis ApproximateArrivalTimestamp to fill in the timestamp for Polaris as well as ingest the data in the event data.

This topic shows you how to use the Kinesis stream format to ingest Kinesis event data and its corresponding metadata.

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024. See Migrate to project-scoped URL for more information.

Prerequisites

Before continuing with this topic, ensure you have the following:

  • Familiarity with the basics of creating a streaming ingestion job.
  • A connection that describes your Kinesis data source.
  • An API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Kinesis input format

If you want an ingestion job to only ingest event values, define the data format, such as nd-json in source.formatSettings.format.

If you want to ingest metadata as well as the message values, specify all of the following:

  • source.formatSettings.format: set to kinesis
  • source.formatSettings.valueFormat: set to the data format of the event message

For example:

    "source": {
...
"formatSettings": {
"format": "kinesis",
"valueFormat": {
"format": "nd-json"
}
}
},

Map the Kinesis timestamp to the primary table timestamp

To use the Kinesis fields in mappings, such as to parse the Kinesis approximate arrival time timestamp, include them in source.inputSchema. Therefore, declare the Kinesis timestamp in the inputSchema as follows:

  • Set the name property to the same value as timestampColumnName (kinesis.timestamp by default).
  • Set the dataType to long.

For example:

"inputSchema": [
{
"name": "kinesis.timestamp",
"dataType": "long"
},
]

When mapping the timestamp to __time, apply the MILLIS_TO_TIMESTAMP operator—that is, MILLIS_TO_TIMESTAMP("kinesis.timestamp"). For example:

"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")"
},
]

Sample request

The following example request creates an ingestion job using the Kinesis format and maps the Kinesis timestamp to __time:

curl --location "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Accept: application/json" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--data '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"useSchemaDiscovery": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "kinesis.timestamp",
"dataType": "long"
},
{
"name": "city",
"dataType": "string"
}
],
"formatSettings": {
"format": "kinesis",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kinesis.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")"
}
]

}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Show the response
{
"source": {
"connectionName": "docs-demo",
"formatSettings": {
"valueFormat": {
"flattenSpec": null,
"format": "nd-json"
},
"partitionKeyColumnName": "kinesis.partitionKey",
"timestampColumnName": "kinesis.timestamp",
"format": "kinesis"
},
"inputSchema": [
{
"dataType": "long",
"name": "kinesis.timestamp"
},
{
"dataType": "string",
"name": "city"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_ajqxs...nheaas",
"userId": "59593c3b-cb3d-4695-be11-d6df94444032"
},
"createdTimestamp": "2024-08-12T20:50:11.734929Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "0191485b-0a96-7b29-84f6-f9081eb6602b",
"lastModifiedBy": {
"username": "api-key-pok_ajqxs...nheaas",
"userId": "59593c3b-cb3d-4695-be11-d6df94444032"
},
"lastUpdatedTimestamp": "2024-08-12T20:50:11.734929Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"formatSettings": {
"valueFormat": {
"flattenSpec": null,
"format": "nd-json"
},
"partitionKeyColumnName": "kinesis.partitionKey",
"timestampColumnName": "kinesis.timestamp",
"format": "kinesis"
},
"inputSchema": [
{
"dataType": "long",
"name": "kinesis.timestamp"
},
{
"dataType": "string",
"name": "city"
}
],
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": null
},
"clusteringColumns": [],
"createTableIfNotExists": true,
"dimensionExclusions": [],
"earlyMessageRejectionPeriod": "P2000D",
"enableUpserts": false,
"filterExpression": null,
"includeAllDimensions": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"partitionedBy": null,
"readFromPoint": "latest",
"replaceRunning": false,
"upsertKeyColumns": null,
"upsertVersionColumn": null,
"useSchemaDiscovery": true,
"type": "streaming"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": null
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Map Kinesis partition key

In addition to the Kinesis timestamp, events are associated with a partition key. The partition key identifies which shard a record belongs to. You can extract this metadata to ingest into your table.

Declare the key in source.inputSchema. For example:

"inputSchema": [
{
"name": "kinesis.partitionKey",
"dataType": "string"
},
]

Map the key input field to a table column in mappings. For example:

 "mappings": [
{
"columnName": "kinesis.partitionKey",
"expression": "\"kinesis.partitionKey\"",
},
]

Alternatively, you can use schema auto-discovery to let Polaris discover the partition key column.

Learn more

See the following topics for more information: