• Developer guide
  • API reference

›Ingestion sources

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Specify data schema
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Ingest data from Confluent Cloud by API

You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Confluent Cloud. Confluent Cloud is a fully managed, cloud-native service for Apache Kafka.

This topic covers the process you need to follow to create a connection to Confluent Cloud, create a Polaris table to receive the data, then create a job to ingest data from the source into the table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.

Prerequisites

This topic assumes you have an active Confluent Cloud account. You will need the following before you start to set up ingestion from Confluent Cloud:

  • Apache Kafka events to ingest into Polaris. See Supported data and file formats for the supported data formats for streaming ingestion.
    Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. You can use batch file ingestion if you want to ingest older data.
  • The name of the Confluent Cloud topic that contains the event data.
  • Bootstrap server information: a list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster. This list should be in the form host1:port1,host2:port2,... For details on where to find the bootstrap server in Confluent Cloud, see Access cluster settings in the Confluent Cloud Console.
    You can also use the Confluent Cloud API to read a cluster and find its bootstrap server information in the kafka_bootstrap_endpoint of the cluster object.
  • A Confluent Cloud API key and secret that Polaris will use to make the connection. Note that resource-specific API keys in Confluent Cloud require an access control list that restricts access. See Use API keys to control access for more information.

You will also need an API key with the following permissions:

  • ManageTables
  • ManageConnections
  • ManageIngestionJobs

In the examples below, the key value is stored in the variable named POLARIS_API_KEY. See Authenticate with API keys to obtain an API key and assign service account permissions. For more information on permissions, visit Permissions reference.

Create a table

Create a new table to receive the ingested data, or locate the details of an existing table.

The following sample request creates the sample-confluent-ingestion table using the Tables v2 API.

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "detail",
"name": "sample-confluent-ingestion",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "country",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables"

apikey = os.getenv("POLARIS_API_KEY")


payload = json.dumps({
"type": "detail",
"name": "sample-confluent-ingestion",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "country",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
}
]
})

headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Create a connection to Confluent Cloud

Each connection is associated with a single source of data. If you plan to ingest data from multiple topics, create a new connection for each one.

Send a POST request to the /v2/connections endpoint to create a connection.

See the Prerequisites and the Connections v2 API documentation for a description of required parameters.

Sample request

The following example request creates a connection named my_connection to topic my_topic in Confluent Cloud:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic"
})

headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
    "type": "confluent",
    "name": "my_connection",
    "bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
    "topicName": "my_topic"
}

Add your key and secret to the connection

Send a PUT request to the /v2/connections/{name}/secrets endpoint to add key and secret credentials to the connection. Replace {name} in the path with the name of your connection in Polaris.

See the Connections v2 API documentation for a description of required parameters.

Sample request

The following example request adds a Confluent Cloud key and secret to a connection named my_connection:

cURL
Python
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "confluent",
"key": "$CONFLUENT_KEY",
"secret": "$CONFLUENT_SECRET"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "confluent",
"key": "$CONFLUENT_KEY",
"secret": "$CONFLUENT_SECRET"
})

headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("PUT", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response.

Test a connection

You can send a POST request to the /v2/connections/{name}/test endpoint to test a connection.

If the connection fails, the response includes detailed information about the reason for the failure so you can update the connection and try again. Use the /v2/connections/{name} and /v2/connections/{name}/secrets endpoints to update your connection. See the Connections v2 API documentation for details.

Start an ingestion job

Send a POST request to the /v2/jobs endpoint to create an ingestion job using your connection and table details.

See the Job v2 API documentation for a description of required parameters. You can include the optional readFromPoint property in the job request to instruct Polaris to read data from the earliest or latest point available in the stream.

You can create multiple ingestion jobs with the same connection—for example, to ingest event data from a topic into both a detail table and an aggregate table.

Sample request

The following example request creates an ingestion job for the sample-confluent-ingestion table using the my_connection connection.

Use source type connection for an ingestion job that connects to Confluent Cloud.

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '
T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
}
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
}
})

headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job.

Click to view the response

{
    "type": "streaming",
    "id": "ad837s4k-4633-6g56-8e23-7384kd905273",
    "target": {
        "type": "table",
        "tableName": "sample-confluent-ingestion"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "my_username",
        "userID": "12345-1a3b-36s9-7e23-277g7899c223"
    },
    "lastModifiedBy": {
        "username": "my_username",
        "userID": "12345-1a3b-36s9-7e23-277g7899c223"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2022-07-28T12:52:06.365276805Z",
    "lastUpdatedTimestamp": "2022-07-28T12:52:06.365276805Z",
    "source": {
        "type": "connection",
        "connectionName": "my_connection",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "date"
            },
            {
                "dataType": "string",
                "name": "time"
            },
            {
                "dataType": "string",
                "name": "country"
            },
            {
                "dataType": "string",
                "name": "city"
            }
        ],
        "formatSettings": {
            "format": "nd-json"
        }
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
        },
        {
            "columnName": "country",
            "expression": "country"
        },
        {
            "columnName": "city",
            "expression": "city"
        }
    ]
}

</ details>

Add data to your topic

Once the ingestion job is running, add event data to your Confluent Cloud topic. For example, the following example is a single message produced to the topic my_topic. Note that the data structure matches the input schema in the ingestion job.

{
    "date": "2022-08-01", 
    "time": "02:47:05.474Z", 
    "country": "australia",
    "city": "darwin"
}

Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. Use batch file ingestion if you want to ingest older data.

The new data is ingested into the Polaris table you specified in the job.

Cancel a job

To cancel an ingestion job, follow these steps:

  1. Send a GET request to the /v2/jobs endpoint to find the job ID.
  2. Send a PUT request with the job ID to the /v2/jobs/{jobId} endpoint and set the desiredExecutionStatus to canceled.

See the Jobs v2 API documentation for a description of required parameters.

Sample request

The following request is an example of step 2. It cancels a job for the specified job ID.

cURL
Python
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/gs65dfe6-32f5-7s3k-n5sd-gre007e74422' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"desiredExecutionStatus": "canceled"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/gs65dfe6-32f5-7s3k-n5sd-gre007e74422"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"desiredExecutionStatus": "canceled"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("PUT", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response and the details of the canceled job.

Monitor an ingestion job

See Monitor streaming ingestion from a connection for information on how to monitor the status of your ingestion job in the Polaris UI.

Known limitations

We're regularly updating Polaris to add features and fixes. If you encounter an issue, check the Known limitations page.

Learn more

See the following topics for more information:

  • Ingest from Confluent Cloud for reference on connecting from Confluent Cloud to Polaris.
  • Tables v2 API for information on creating and managing tables.
  • Connections v2 API for information on creating and managing connections.
  • Jobs v2 API for information on creating and managing ingestion jobs.
← Ingest from Amazon KinesisPush event data →
  • Prerequisites
  • Create a table
  • Create a connection to Confluent Cloud
    • Sample request
    • Sample response
  • Add your key and secret to the connection
    • Sample request
    • Sample response
  • Test a connection
  • Start an ingestion job
    • Sample request
    • Sample response
  • Add data to your topic
  • Cancel a job
    • Sample request
    • Sample response
  • Monitor an ingestion job
  • Known limitations
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc