Ingest data from Amazon Kinesis by API
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Amazon Kinesis. Amazon Kinesis is a real-time data processing platform provided by Amazon Web Services.
Note that Polaris only supports ingesting from Amazon Kinesis when your cloud provider is AWS.
This topic covers the process you need to follow to create a connection to a Kinesis data stream and ingest data from the stream into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For an end-to-end guide on Kinesis ingestion in Polaris, see Guide for Kinesis ingestion.
Prerequisites
Before you create a connection to ingest from Kinesis, complete the following:
- 
Confirm that you have a Kinesis stream with data. 
- 
Verify that your events are in a supported format and that the event timestamps are within 30 days of ingestion time. If you need to ingest events older than 30 days, configure lateMessageRejectionPeriodaccordingly in your streaming ingestion job.
- 
Review Connect to Amazon Kinesis for the required information to create the connection. 
- 
Ensure your AWS IAM role has permission to access your data and grants permission for Imply to assume your role. For more information, see Secure connection to AWS and Get Imply role ARN to grant access to AWS resources. 
- 
If you don't have one already, create a Polaris API key with the ManageConnectionspermission. If you plan to create tables or ingestion jobs, you also needManageTablesandManageIngestionJobs, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable namedPOLARIS_API_KEYto store the API key.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Amazon Kinesis
When creating a connection to a Kinesis data stream, specify the stream name and endpoint as well as details of the AWS role for access to the Kinesis stream. For the requisite information to connect to a Kinesis stream from Polaris, see Connect to Amazon Kinesis. Each connection is associated with a single source of data. If you plan to ingest data from multiple streams, create a new connection for each one.
Send a POST request to the /v1/projects/PROJECT_ID/connections endpoint to create a connection.
For a description of the required parameters, see the Connections API documentation.
Sample request
The following example request creates a connection named stream-wiki to stream demo-stream from Kinesis:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
  "type": "kinesis",
  "name": "stream-wiki",
  "awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
  "awsEndpoint": "kinesis.us-east-1.amazonaws.com",
  "stream": "demo-stream"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
  "type": "kinesis",
  "name": "stream-wiki",
  "awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
  "awsEndpoint": "kinesis.us-east-1.amazonaws.com",
  "stream": "demo-stream"
})
headers = {
  'Authorization': f'Basic {apikey}',
  'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK response and the details of the successful connection, for example:
{
    "type": "kinesis",
    "name": "stream-wiki",
    "submittedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "submittedOnTimestamp": "2022-10-08T00:30:13Z",
    "modifiedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "modifiedOnTimestamp": "2022-10-08T00:30:13Z",
    "awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
    "awsEndpoint": "kinesis.us-east-1.amazonaws.com",
    "stream": "demo-stream"
}
Start an ingestion job
Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to start a streaming ingestion job.
Include the connection name, table name, and details about the input data in the request payload.
Use schema auto-discovery for your streaming ingestion job for Polaris to automatically detect the schema of the input data and map them directly to columns in the destination table. Note that the discovery includes Kinesis metadata, such as the partition key and the Kinesis server-side timestamp (ApproximateArrivalTimestamp). If you don't want to ingest these columns, list them in dimensionExclusions in your spec.
See the Jobs v1 API documentation for a description of required parameters.
Sample request
The following example request creates an ingestion job for the wikipedia-kinesis table using the stream-wiki connection:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
    "type": "streaming",
    "source": {
        "type": "connection",
        "connectionName": "stream-wiki",
        "formatSettings": {
            "format": "nd-json"
        },
        "inputSchema": [
            {
                "name": "timestamp",
                "dataType": "string"
            }
        ]
    },
    "target": {
        "type": "table",
        "tableName": "wikipedia-kinesis"
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        }
    ],
    "createTableIfNotExists": true,
    "useSchemaDiscovery": true
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
  "type": "streaming",
  "source": {
    "type": "connection",
    "connectionName": "stream-wiki",
    "formatSettings": {
      "format": "nd-json"
    },
    "inputSchema": [
      {
        "name": "timestamp",
        "dataType": "string"
      }
    ]
  },
  "target": {
    "type": "table",
    "tableName": "wikipedia-kinesis"
  },
  "mappings": [
    {
      "columnName": "__time",
      "expression": "TIME_PARSE(\"timestamp\")"
    }
  ],
  "createTableIfNotExists": True,
  "useSchemaDiscovery": True
})
headers = {
  'Authorization': f'Basic {apikey}',
  'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created response and the details of the ingestion job:
Click to view the response
{
    "source": {
        "connectionName": "stream-wiki",
        "formatSettings": {
            "flattenSpec": {},
            "format": "nd-json"
        },
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            }
        ],
        "type": "connection"
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")",
            "isAggregation": null
        }
    ],
    "dimensionExclusions": [],
    "useSchemaDiscovery": true,
    "filterExpression": null,
    "lateMessageRejectionPeriod": "P30D",
    "earlyMessageRejectionPeriod": "P2000D",
    "readFromPoint": "latest",
    "maxParseExceptions": 2147483647,
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "createdTimestamp": "2023-09-12T23:17:39.142369067Z",
    "desiredExecutionStatus": "running",
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "id": "018a8baf-a6c6-721f-b35a-284ffc8dce90",
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "lastUpdatedTimestamp": "2023-09-12T23:17:39.142369067Z",
    "spec": {
        "source": {
            "connectionName": "stream-wiki",
            "formatSettings": {
                "flattenSpec": {},
                "format": "nd-json"
            },
            "inputSchema": [
                {
                    "dataType": "string",
                    "name": "timestamp"
                }
            ],
            "type": "connection"
        },
        "target": {
            "tableName": "wikipedia-kinesis",
            "type": "table",
            "intervals": []
        },
        "createTableIfNotExists": true,
        "dimensionExclusions": [],
        "filterExpression": null,
        "lateMessageRejectionPeriod": "P30D",
        "earlyMessageRejectionPeriod": "P2000D",
        "mappings": [
            {
                "columnName": "__time",
                "expression": "TIME_PARSE(\"timestamp\")",
                "isAggregation": null
            }
        ],
        "maxParseExceptions": 2147483647,
        "readFromPoint": "latest",
        "useSchemaDiscovery": true,
        "replaceRunning": false,
        "type": "streaming",
        "desiredExecutionStatus": "running"
    },
    "target": {
        "tableName": "wikipedia-kinesis",
        "type": "table",
        "intervals": []
    },
    "type": "streaming",
    "completedTimestamp": null,
    "startedTimestamp": null
}
Learn more
See the following topics for more information:
- Connect to Amazon Kinesis for reference on connecting from Amazon Kinesis to Polaris.
- Create a streaming ingestion job for details on creating streaming ingestion jobs in Polaris.
- Ingest Kinesis records and metadata to ingest Kinesis metadata along with message values.
- Connections v1 API for information on creating and managing connections.
- Jobs v1 API for information on creating and managing ingestion jobs.