Skip to main content

Ingest data from Amazon Kinesis by API

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024.

You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023. For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID

Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.

You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Amazon Kinesis. Amazon Kinesis is a real-time data processing platform provided by Amazon Web Services.

This topic covers the process you need to follow to create a connection to a Kinesis data stream and ingest data from the stream into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.

tip

For an end-to-end guide on Kinesis ingestion in Polaris, see Guide for Kinesis ingestion.

Prerequisites

Before you create a connection to ingest from Kinesis, complete the following:

  • Confirm that you have a Kinesis stream with data.

  • Verify that your events are in a supported format and that the event timestamps are within 30 days of ingestion time. If you need to ingest events older than 30 days, configure lateMessageRejectionPeriod accordingly in your streaming ingestion job.

  • Review Connect to Amazon Kinesis for the required information to create the connection.

  • Ensure your AWS IAM role has permission to access your data and grants permission for Imply to assume your role. For more information, see Secure connection to AWS and Get Imply role ARN to grant access to AWS resources.

  • If you don't have one already, create a Polaris API key with the ManageConnections permission. If you plan to create tables or ingestion jobs, you also need ManageTables and ManageIngestionJobs, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable named POLARIS_API_KEY to store the API key.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Create a connection to Amazon Kinesis

When creating a connection to a Kinesis data stream, specify the stream name and endpoint as well as details of the AWS role for access to the Kinesis stream. For the requisite information to connect to a Kinesis stream from Polaris, see Connect to Amazon Kinesis. Each connection is associated with a single source of data. If you plan to ingest data from multiple streams, create a new connection for each one.

Send a POST request to the /v1/projects/PROJECT_ID/connections endpoint to create a connection. For a description of the required parameters, see the Connections API documentation.

Sample request

The following example request creates a connection named stream-wiki to stream demo-stream from Kinesis:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}'

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
"type": "kinesis",
"name": "stream-wiki",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2022-10-08T00:30:13Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2022-10-08T00:30:13Z",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}

Start an ingestion job

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to start a streaming ingestion job. Include the connection name, table name, and details about the input data in the request payload.

Use schema auto-discovery for your streaming ingestion job for Polaris to automatically detect the schema of the input data and map them directly to columns in the destination table.

See the Jobs v1 API documentation for a description of required parameters.

Sample request

The following example request creates an ingestion job for the wikipedia-kinesis table using the stream-wiki connection:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data '{
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
}
]
},
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
}
],
"createTableIfNotExists": true,
"useSchemaDiscovery": true
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"source": {
"connectionName": "stream-wiki",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-12T23:17:39.142369067Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a8baf-a6c6-721f-b35a-284ffc8dce90",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-12T23:17:39.142369067Z",
"spec": {
"source": {
"connectionName": "stream-wiki",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
}
],
"type": "connection"
},
"target": {
"tableName": "wikipedia-kinesis",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": true,
"replaceRunning": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "wikipedia-kinesis",
"type": "table",
"intervals": []
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

See the following topics for more information: