Ingest data from Amazon Kinesis by API
You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Amazon Kinesis. Amazon Kinesis is a real-time data processing platform provided by Amazon Web Services.
This topic covers the process you need to follow to create a connection to a Kinesis data stream and ingest data from the stream into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For a list of all ingestion options, see Sources.
Prerequisites
Before you set up ingestion from Kinesis, review Ingest from Amazon Kinesis for the required information regarding your Kinesis data stream and AWS access management. Ensure you grant access to the Imply role by listing the ARN of Imply's role as a principal in your trust policy. For more information, see Get Imply role ARN to grant access to AWS resources.
This topic assumes you have a Kinesis stream with data as well as the correct permissions for your IAM role.
You also need a Polaris API key with the following permissions:
ManageTables
ManageConnections
ManageIngestionJobs
In the examples below, the key value is stored in the environment variable named POLARIS_API_KEY
.
See Authenticate with API keys to obtain an API key and assign service account permissions.
For more information on permissions, visit Permissions reference.
Create a table
Create a table named wikipedia-kinesis
that will receive the data from the ingestion job connected to the Kinesis data stream.
The table schema matches the records in the stream. The following snippet shows an example record from the stream:
{
"isRobot":true,
"channel":"#sv.wikipedia",
"timestamp":"2022-09-27T00:00:11.080Z",
"flags":"NB",
"isUnpatrolled":false,
"page":"Salo Toraut",
"diffUrl":"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918",
"added":31,
"comment":"Botskapande Indonesien omdirigering",
"commentLength":35,
"isNew":true,
"isMinor":false,
"delta":31,
"isAnonymous":false,
"user":"Lsjbot",
"deltaBucket":0.0,
"deleted":0,
"namespace":"Main"
}
For more information about creating tables, see Create a table by API and the Tables v2 API reference.
The following request creates the wikipedia-kinesis
table:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "detail",
"name": "wikipedia-kinesis",
"schema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "detail",
"name": "wikipedia-kinesis",
"schema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Create a connection to Amazon Kinesis
When creating a connection to a Kinesis data stream, specify the stream name and endpoint as well as details of the AWS role for access to the Kinesis stream. For the requisite information to connect to a Kinesis stream from Polaris, see Ingest from Amazon Kinesis. Each connection is associated with a single source of data. If you plan to ingest data from multiple streams, create a new connection for each one.
Send a POST
request to the /v2/connections
endpoint to create a connection.
For a description of the required parameters, see the Connections v2 API documentation.
Sample request
The following example request creates a connection named stream-wiki
to stream demo-stream
from Kinesis:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "kinesis",
"name": "stream-wiki",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"submittedOnTimestamp": "2022-10-08T00:30:13Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"modifiedOnTimestamp": "2022-10-08T00:30:13Z",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}
Start an ingestion job
Submit a POST
request to the /v2/jobs
endpoint to create an ingestion job.
Include the connection name, table name, and details about the input data in the request payload.
See the Job v2 API documentation for a description of required parameters.
Sample request
The following example request creates an ingestion job for the wikipedia-kinesis
table using the stream-wiki
connection:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "isRobot",
"expression": "isRobot"
},
{
"columnName": "channel",
"expression": "channel"
},
{
"columnName": "flags",
"expression": "flags"
},
{
"columnName": "isUnpatrolled",
"expression": "isUnpatrolled"
},
{
"columnName": "page",
"expression": "page"
},
{
"columnName": "diffUrl",
"expression": "diffUrl"
},
{
"columnName": "added",
"expression": "added"
},
{
"columnName": "comment",
"expression": "comment"
},
{
"columnName": "commentLength",
"expression": "commentLength"
},
{
"columnName": "isNew",
"expression": "isNew"
},
{
"columnName": "isMinor",
"expression": "isMinor"
},
{
"columnName": "delta",
"expression": "delta"
},
{
"columnName": "isAnonymous",
"expression": "isAnonymous"
},
{
"columnName": "user",
"expression": "user"
},
{
"columnName": "deltaBucket",
"expression": "deltaBucket"
},
{
"columnName": "deleted",
"expression": "deleted"
},
{
"columnName": "namespace",
"expression": "namespace"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "isRobot",
"expression": "isRobot"
},
{
"columnName": "channel",
"expression": "channel"
},
{
"columnName": "flags",
"expression": "flags"
},
{
"columnName": "isUnpatrolled",
"expression": "isUnpatrolled"
},
{
"columnName": "page",
"expression": "page"
},
{
"columnName": "diffUrl",
"expression": "diffUrl"
},
{
"columnName": "added",
"expression": "added"
},
{
"columnName": "comment",
"expression": "comment"
},
{
"columnName": "commentLength",
"expression": "commentLength"
},
{
"columnName": "isNew",
"expression": "isNew"
},
{
"columnName": "isMinor",
"expression": "isMinor"
},
{
"columnName": "delta",
"expression": "delta"
},
{
"columnName": "isAnonymous",
"expression": "isAnonymous"
},
{
"columnName": "user",
"expression": "user"
},
{
"columnName": "deltaBucket",
"expression": "deltaBucket"
},
{
"columnName": "deleted",
"expression": "deleted"
},
{
"columnName": "namespace",
"expression": "namespace"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"id": "34dbbc7e-1798-4872-8318-9154aebf8688",
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-10-11T20:14:58.322938727Z",
"lastUpdatedTimestamp": "2022-10-11T20:14:58.322938727Z",
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"inputSchema": [
{
"dataType": "string",
"name": "isRobot"
},
{
"dataType": "string",
"name": "channel"
},
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "flags"
},
{
"dataType": "string",
"name": "isUnpatrolled"
},
{
"dataType": "string",
"name": "page"
},
{
"dataType": "string",
"name": "diffUrl"
},
{
"dataType": "long",
"name": "added"
},
{
"dataType": "string",
"name": "comment"
},
{
"dataType": "string",
"name": "commentLength"
},
{
"dataType": "string",
"name": "isNew"
},
{
"dataType": "string",
"name": "isMinor"
},
{
"dataType": "string",
"name": "delta"
},
{
"dataType": "string",
"name": "isAnonymous"
},
{
"dataType": "string",
"name": "user"
},
{
"dataType": "long",
"name": "deltaBucket"
},
{
"dataType": "long",
"name": "deleted"
},
{
"dataType": "string",
"name": "namespace"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "isRobot",
"expression": "isRobot"
},
{
"columnName": "channel",
"expression": "channel"
},
{
"columnName": "flags",
"expression": "flags"
},
{
"columnName": "isUnpatrolled",
"expression": "isUnpatrolled"
},
{
"columnName": "page",
"expression": "page"
},
{
"columnName": "diffUrl",
"expression": "diffUrl"
},
{
"columnName": "added",
"expression": "added"
},
{
"columnName": "comment",
"expression": "comment"
},
{
"columnName": "commentLength",
"expression": "commentLength"
},
{
"columnName": "isNew",
"expression": "isNew"
},
{
"columnName": "isMinor",
"expression": "isMinor"
},
{
"columnName": "delta",
"expression": "delta"
},
{
"columnName": "isAnonymous",
"expression": "isAnonymous"
},
{
"columnName": "user",
"expression": "user"
},
{
"columnName": "deltaBucket",
"expression": "deltaBucket"
},
{
"columnName": "deleted",
"expression": "deleted"
},
{
"columnName": "namespace",
"expression": "namespace"
}
]
}
Learn more
See the following topics for more information:
- Ingest from Amazon Kinesis for reference on connecting from Amazon Kinesis to Polaris.
- Tables v2 API for information on creating and managing tables.
- Connections v2 API for information on creating and managing connections.
- Jobs v2 API for information on creating and managing ingestion jobs.