• Developer guide
  • API reference

›Ingestion sources

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Specify data schema
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Ingest data from Amazon Kinesis by API

You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Amazon Kinesis. Amazon Kinesis is a real-time data processing platform provided by Amazon Web Services.

This topic covers the process you need to follow to create a connection to a Kinesis data stream and ingest data from the stream into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.

For a list of all ingestion options, see Sources.

Prerequisites

Before you set up ingestion from Kinesis, review Ingest from Amazon Kinesis for the required information regarding your Kinesis data stream and AWS access management. Ensure you grant access to the Imply role by listing the ARN of Imply's role as a principal in your trust policy. For more information, see Get Imply role ARN to grant access to AWS resources.

This topic assumes you have a Kinesis stream with data as well as the correct permissions for your IAM role.

You also need a Polaris API key with the following permissions:

  • ManageTables
  • ManageConnections
  • ManageIngestionJobs

In the examples below, the key value is stored in the environment variable named POLARIS_API_KEY. See Authenticate with API keys to obtain an API key and assign service account permissions. For more information on permissions, visit Permissions reference.

Create a table

Create a table named wikipedia-kinesis that will receive the data from the ingestion job connected to the Kinesis data stream. The table schema matches the records in the stream. The following snippet shows an example record from the stream:

{
   "isRobot":true,
   "channel":"#sv.wikipedia",
   "timestamp":"2022-09-27T00:00:11.080Z",
   "flags":"NB",
   "isUnpatrolled":false,
   "page":"Salo Toraut",
   "diffUrl":"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918",
   "added":31,
   "comment":"Botskapande Indonesien omdirigering",
   "commentLength":35,
   "isNew":true,
   "isMinor":false,
   "delta":31,
   "isAnonymous":false,
   "user":"Lsjbot",
   "deltaBucket":0.0,
   "deleted":0,
   "namespace":"Main"
}

For more information about creating tables, see Create a table by API and the Tables v2 API reference.

The following request creates the wikipedia-kinesis table:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "detail",
"name": "wikipedia-kinesis",
"schema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "detail",
"name": "wikipedia-kinesis",
"schema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Create a connection to Amazon Kinesis

When creating a connection to a Kinesis data stream, specify the stream name and endpoint as well as details of the AWS role for access to the Kinesis stream. For the requisite information to connect to a Kinesis stream from Polaris, see Ingest from Amazon Kinesis. Each connection is associated with a single source of data. If you plan to ingest data from multiple streams, create a new connection for each one.

Send a POST request to the /v2/connections endpoint to create a connection. For a description of the required parameters, see the Connections v2 API documentation.

Sample request

The following example request creates a connection named stream-wiki to stream demo-stream from Kinesis:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "kinesis",
"name": "stream-wiki",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "kinesis.us-east-1.amazonaws.com",
"stream": "demo-stream"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
    "type": "kinesis",
    "name": "stream-wiki",
    "submittedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "submittedOnTimestamp": "2022-10-08T00:30:13Z",
    "modifiedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "modifiedOnTimestamp": "2022-10-08T00:30:13Z",
    "awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
    "awsEndpoint": "kinesis.us-east-1.amazonaws.com",
    "stream": "demo-stream"
}

Start an ingestion job

Submit a POST request to the /v2/jobs endpoint to create an ingestion job. Include the connection name, table name, and details about the input data in the request payload.

See the Job v2 API documentation for a description of required parameters.

Sample request

The following example request creates an ingestion job for the wikipedia-kinesis table using the stream-wiki connection:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "isRobot",
"expression": "isRobot"
},
{
"columnName": "channel",
"expression": "channel"
},
{
"columnName": "flags",
"expression": "flags"
},
{
"columnName": "isUnpatrolled",
"expression": "isUnpatrolled"
},
{
"columnName": "page",
"expression": "page"
},
{
"columnName": "diffUrl",
"expression": "diffUrl"
},
{
"columnName": "added",
"expression": "added"
},
{
"columnName": "comment",
"expression": "comment"
},
{
"columnName": "commentLength",
"expression": "commentLength"
},
{
"columnName": "isNew",
"expression": "isNew"
},
{
"columnName": "isMinor",
"expression": "isMinor"
},
{
"columnName": "delta",
"expression": "delta"
},
{
"columnName": "isAnonymous",
"expression": "isAnonymous"
},
{
"columnName": "user",
"expression": "user"
},
{
"columnName": "deltaBucket",
"expression": "deltaBucket"
},
{
"columnName": "deleted",
"expression": "deleted"
},
{
"columnName": "namespace",
"expression": "namespace"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "wikipedia-kinesis"
},
"source": {
"type": "connection",
"connectionName": "stream-wiki",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"name": "isRobot",
"dataType": "string"
},
{
"name": "channel",
"dataType": "string"
},
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "flags",
"dataType": "string"
},
{
"name": "isUnpatrolled",
"dataType": "string"
},
{
"name": "page",
"dataType": "string"
},
{
"name": "diffUrl",
"dataType": "string"
},
{
"name": "added",
"dataType": "long"
},
{
"name": "comment",
"dataType": "string"
},
{
"name": "commentLength",
"dataType": "string"
},
{
"name": "isNew",
"dataType": "string"
},
{
"name": "isMinor",
"dataType": "string"
},
{
"name": "delta",
"dataType": "string"
},
{
"name": "isAnonymous",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "deltaBucket",
"dataType": "long"
},
{
"name": "deleted",
"dataType": "long"
},
{
"name": "namespace",
"dataType": "string"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "isRobot",
"expression": "isRobot"
},
{
"columnName": "channel",
"expression": "channel"
},
{
"columnName": "flags",
"expression": "flags"
},
{
"columnName": "isUnpatrolled",
"expression": "isUnpatrolled"
},
{
"columnName": "page",
"expression": "page"
},
{
"columnName": "diffUrl",
"expression": "diffUrl"
},
{
"columnName": "added",
"expression": "added"
},
{
"columnName": "comment",
"expression": "comment"
},
{
"columnName": "commentLength",
"expression": "commentLength"
},
{
"columnName": "isNew",
"expression": "isNew"
},
{
"columnName": "isMinor",
"expression": "isMinor"
},
{
"columnName": "delta",
"expression": "delta"
},
{
"columnName": "isAnonymous",
"expression": "isAnonymous"
},
{
"columnName": "user",
"expression": "user"
},
{
"columnName": "deltaBucket",
"expression": "deltaBucket"
},
{
"columnName": "deleted",
"expression": "deleted"
},
{
"columnName": "namespace",
"expression": "namespace"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response

{
    "type": "streaming",
    "id": "34dbbc7e-1798-4872-8318-9154aebf8688",
    "target": {
        "type": "table",
        "tableName": "wikipedia-kinesis"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2022-10-11T20:14:58.322938727Z",
    "lastUpdatedTimestamp": "2022-10-11T20:14:58.322938727Z",
    "source": {
        "type": "connection",
        "connectionName": "stream-wiki",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "isRobot"
            },
            {
                "dataType": "string",
                "name": "channel"
            },
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "flags"
            },
            {
                "dataType": "string",
                "name": "isUnpatrolled"
            },
            {
                "dataType": "string",
                "name": "page"
            },
            {
                "dataType": "string",
                "name": "diffUrl"
            },
            {
                "dataType": "long",
                "name": "added"
            },
            {
                "dataType": "string",
                "name": "comment"
            },
            {
                "dataType": "string",
                "name": "commentLength"
            },
            {
                "dataType": "string",
                "name": "isNew"
            },
            {
                "dataType": "string",
                "name": "isMinor"
            },
            {
                "dataType": "string",
                "name": "delta"
            },
            {
                "dataType": "string",
                "name": "isAnonymous"
            },
            {
                "dataType": "string",
                "name": "user"
            },
            {
                "dataType": "long",
                "name": "deltaBucket"
            },
            {
                "dataType": "long",
                "name": "deleted"
            },
            {
                "dataType": "string",
                "name": "namespace"
            }
        ],
        "formatSettings": {
            "format": "nd-json"
        }
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        },
        {
            "columnName": "isRobot",
            "expression": "isRobot"
        },
        {
            "columnName": "channel",
            "expression": "channel"
        },
        {
            "columnName": "flags",
            "expression": "flags"
        },
        {
            "columnName": "isUnpatrolled",
            "expression": "isUnpatrolled"
        },
        {
            "columnName": "page",
            "expression": "page"
        },
        {
            "columnName": "diffUrl",
            "expression": "diffUrl"
        },
        {
            "columnName": "added",
            "expression": "added"
        },
        {
            "columnName": "comment",
            "expression": "comment"
        },
        {
            "columnName": "commentLength",
            "expression": "commentLength"
        },
        {
            "columnName": "isNew",
            "expression": "isNew"
        },
        {
            "columnName": "isMinor",
            "expression": "isMinor"
        },
        {
            "columnName": "delta",
            "expression": "delta"
        },
        {
            "columnName": "isAnonymous",
            "expression": "isAnonymous"
        },
        {
            "columnName": "user",
            "expression": "user"
        },
        {
            "columnName": "deltaBucket",
            "expression": "deltaBucket"
        },
        {
            "columnName": "deleted",
            "expression": "deleted"
        },
        {
            "columnName": "namespace",
            "expression": "namespace"
        }
    ]
}

Learn more

See the following topics for more information:

  • Ingest from Amazon Kinesis for reference on connecting from Amazon Kinesis to Polaris.
  • Tables v2 API for information on creating and managing tables.
  • Connections v2 API for information on creating and managing connections.
  • Jobs v2 API for information on creating and managing ingestion jobs.
← Ingest from Kafka and MSKIngest from Confluent Cloud →
  • Prerequisites
  • Create a table
  • Create a connection to Amazon Kinesis
    • Sample request
    • Sample response
  • Start an ingestion job
    • Sample request
    • Sample response
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc