Ingest Kinesis records and metadata by API
When ingesting data from Amazon Kinesis streams into Imply Polaris, you can ingest event metadata, specifically the approximate arrival timestamp and the partition key.
For example, consider the following record produced to a Kinesis stream topic:
{
"SequenceNumber": "49545115243490985018280067714973144582180062593244200961",
"PartitionKey": "partitionKey-1",
"Data": "SGVsbG8sIFdvcmxkIQ==", // Base64-encoded string
"ApproximateArrivalTimestamp": 1609459200.0
}
If you don't set a default timestamp, Polaris treats the record as unparseable because the event value doesn't contain a timestamp.
You can use the Kinesis ApproximateArrivalTimestamp
to fill in the timestamp for Polaris as well as ingest the data in the event data
.
This topic shows you how to use the Kinesis stream format to ingest Kinesis event data and its corresponding metadata.
Prerequisites
Before continuing with this topic, ensure you have the following:
- Familiarity with the basics of creating a streaming ingestion job.
- A connection that describes your Kinesis data source.
- An API key with the
ManageIngestionJobs
permission. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Kinesis input format
If you want an ingestion job to only ingest event values, define the data format, such as nd-json
in source.formatSettings.format
.
If you want to ingest metadata as well as the message values, specify all of the following:
source.formatSettings.format
: set tokinesis
source.formatSettings.valueFormat
: set to the data format of the event message
For example:
"source": {
...
"formatSettings": {
"format": "kinesis",
"valueFormat": {
"format": "nd-json"
}
}
}
Map the Kinesis timestamp to the primary table timestamp
To use the Kinesis fields in mappings, such as to parse the Kinesis approximate arrival time timestamp, include them in source.inputSchema
.
Therefore, declare the Kinesis timestamp in the inputSchema
as follows:
- Set the
name
property to the same value astimestampColumnName
(kinesis.timestamp
by default). - Set the
dataType
tolong
.
For example:
"inputSchema": [
{
"name": "kinesis.timestamp",
"dataType": "long"
}
]
When mapping the timestamp to __time
, apply the MILLIS_TO_TIMESTAMP
operator—that is, MILLIS_TO_TIMESTAMP("kinesis.timestamp")
.
For example:
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")"
}
]
Sample request
The following example request creates an ingestion job using the Kinesis format and maps the Kinesis timestamp to __time
:
- cURL
- Python
curl --location "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Accept: application/json" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--data '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"useSchemaDiscovery": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "kinesis.timestamp",
"dataType": "long"
},
{
"name": "city",
"dataType": "string"
}
],
"formatSettings": {
"format": "kinesis",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kinesis.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"useSchemaDiscovery": True,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "kinesis.timestamp",
"dataType": "long"
},
{
"name": "city",
"dataType": "string"
}
],
"formatSettings": {
"format": "kinesis",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kinesis.timestamp",
"partitionKeyColumnName": "kinesis.partitionKey"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
})
headers = {
'Content-Type': 'application/json',
'Authorization': f'Basic {apikey}'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the ingestion job details.
Show the response
{
"source": {
"connectionName": "docs-demo",
"formatSettings": {
"valueFormat": {
"flattenSpec": null,
"format": "nd-json"
},
"partitionKeyColumnName": "kinesis.partitionKey",
"timestampColumnName": "kinesis.timestamp",
"format": "kinesis"
},
"inputSchema": [
{
"dataType": "long",
"name": "kinesis.timestamp"
},
{
"dataType": "string",
"name": "city"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_ajqxs...nheaas",
"userId": "59593c3b-cb3d-4695-be11-d6df94444032"
},
"createdTimestamp": "2024-08-12T20:50:11.734929Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "0191485b-0a96-7b29-84f6-f9081eb6602b",
"lastModifiedBy": {
"username": "api-key-pok_ajqxs...nheaas",
"userId": "59593c3b-cb3d-4695-be11-d6df94444032"
},
"lastUpdatedTimestamp": "2024-08-12T20:50:11.734929Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"formatSettings": {
"valueFormat": {
"flattenSpec": null,
"format": "nd-json"
},
"partitionKeyColumnName": "kinesis.partitionKey",
"timestampColumnName": "kinesis.timestamp",
"format": "kinesis"
},
"inputSchema": [
{
"dataType": "long",
"name": "kinesis.timestamp"
},
{
"dataType": "string",
"name": "city"
}
],
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": null
},
"clusteringColumns": [],
"createTableIfNotExists": true,
"dimensionExclusions": [],
"earlyMessageRejectionPeriod": "P2000D",
"enableUpserts": false,
"filterExpression": null,
"includeAllDimensions": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kinesis.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"partitionedBy": null,
"readFromPoint": "latest",
"replaceRunning": false,
"upsertKeyColumns": null,
"upsertVersionColumn": null,
"useSchemaDiscovery": true,
"type": "streaming"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": null
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Map Kinesis partition key
In addition to the Kinesis timestamp, events are associated with a partition key. The partition key identifies which shard a record belongs to. You can extract this metadata to ingest into your table.
Declare the key in source.inputSchema
.
For example:
"inputSchema": [
{
"name": "kinesis.partitionKey",
"dataType": "string"
}
]
Map the key input field to a table column in mappings
.
For example:
"mappings": [
{
"columnName": "kinesis.partitionKey",
"expression": "\"kinesis.partitionKey\"",
}
]
Alternatively, you can use schema auto-discovery to let Polaris discover the partition key column.
Learn more
See the following topics for more information: