Ingest data from Amazon Kinesis by API
Project-less regional API resources have been deprecated and will be removed by the end of September 2024.
You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023.
For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID
Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Amazon Kinesis. Amazon Kinesis is a real-time data processing platform provided by Amazon Web Services.
This topic covers the process you need to follow to create a connection to a Kinesis data stream and ingest data from the stream into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For a list of all ingestion options, see Sources.
Prerequisites
Before you set up ingestion from Kinesis, review Ingest from Amazon Kinesis for the required information regarding your Kinesis data stream and AWS access management. Ensure you grant access to the Imply role by listing the ARN of Imply's role as a principal in your trust policy. For more information, see Get Imply role ARN to grant access to AWS resources.
This topic assumes you have a Kinesis stream with data as well as the correct permissions for your IAM role.
You also need a Polaris API key with the following permissions:
ManageTables
ManageConnections
ManageIngestionJobs
In the examples below, the key value is stored in the environment variable named POLARIS_API_KEY
.
See Authenticate with API keys to obtain an API key and assign service account permissions.
For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Amazon Kinesis
When creating a connection to a Kinesis data stream, specify the stream name and endpoint as well as details of the AWS role for access to the Kinesis stream. For the requisite information to connect to a Kinesis stream from Polaris, see Ingest from Amazon Kinesis. Each connection is associated with a single source of data. If you plan to ingest data from multiple streams, create a new connection for each one.
Send a POST
request to the /v1/projects/PROJECT_ID/connections
endpoint to create a connection.
For a description of the required parameters, see the Connections API documentation.
Sample request
The following example request creates a connection named stream-wiki
to stream demo-stream
from Kinesis:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "kinesis",
"name": "stream-wiki",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2022-10-08T00:30:13Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2022-10-08T00:30:13Z",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}
Start an ingestion job
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to start a streaming ingestion job.
Include the connection name, table name, and details about the input data in the request payload.
Use schema auto-discovery for your streaming ingestion job for Polaris to automatically detect the schema of the input data and map them directly to columns in the destination table.
See the Jobs v1 API documentation for a description of required parameters.
Sample request
The following example request creates an ingestion job for the wikipedia-kinesis
table using the stream-wiki
connection:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data '{
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
}
]
},
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
}
],
"createTableIfNotExists": true,
"useSchemaDiscovery": true
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
}
]
},
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
}
],
"createTableIfNotExists": True,
"useSchemaDiscovery": True
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"source": {
"connectionName": "stream-wiki",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-12T23:17:39.142369067Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a8baf-a6c6-721f-b35a-284ffc8dce90",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-12T23:17:39.142369067Z",
"spec": {
"source": {
"connectionName": "stream-wiki",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
}
],
"type": "connection"
},
"target": {
"tableName": "wikipedia-kinesis",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": true,
"replaceRunning": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "wikipedia-kinesis",
"type": "table",
"intervals": []
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Learn more
See the following topics for more information:
- Ingest from Amazon Kinesis for reference on connecting from Amazon Kinesis to Polaris.
- Create a streaming ingestion job for details on creating streaming ingestion jobs in Polaris.
- Connections v1 API for information on creating and managing connections.
- Jobs v1 API for information on creating and managing ingestion jobs.