• Developer guide
  • API reference

›Ingestion sources

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Specify data schema
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Ingest data from Apache Kafka and Amazon MSK by API

You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Apache Kafka streams, including from Amazon Managed Streaming for Apache Kafka (MSK). Amazon MSK is a fully managed, cloud-native service for Apache Kafka.

This topic covers the process to create a connection to Kafka or MSK.

Prerequisites

This topic assumes you have an active Apache Kafka instance that is reachable on the public internet or an Amazon MSK instance running in your AWS account.

To set up ingestion from Kafka, you need:

  • Apache Kafka events to ingest into Polaris. See Supported data and file formats for the supported data formats for streaming ingestion.
    Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. You can use batch file ingestion if you want to ingest older data.
  • The name of the Kafka topic that contains the event data.
  • Bootstrap server information: a list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster. This list should be in the form host1:port1,host2:port2,... For details on where to find the bootstrap server in Amazon MSK, see Getting the bootstrap brokers using the AWS Management Console.
  • Apache Kafka username and password for Polaris to use to make the connection. Polaris supports Simple Authentication Security Layer (SASL) PLAIN and SASL SCRAM. For SASL SCRAM connections, you must also provide the SCRAM mechanism, either SCRAM-SHA-256 or SCRAM-SHA-512.

    Polaris does not currently support IAM authentication for Amazon MSK.

From the Polaris side, you need the following:

  • A Polaris API key with ManageConnections permissions. If you are creating tables or ingestion jobs, you need ManageTables and ManageIngestionJobs respectively. The examples in this topic use a variable named POLARIS_API_KEY to store the API key.
  • A destination table to store events from Apache Kafka.

Create a connection to Kafka

Each connection is associated with a single source of data. If you plan to ingest data from multiple topics, create a new connection for each one.

Send a POST request to the /v2/connections endpoint to create a connection. Set the following properties in the payload:

  • type: kafka
  • name: an arbitrary descriptive name for your connection
  • bootstrapServers: the list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster in the form host1:port1,host2:port2,...
  • topicName: the name of the Kafka topic to read from

See the Prerequisites and the Connections v2 API documentation for a description of required parameters.

Sample request

The following example request creates a connection named my_connection to topic my_topic in Kafka with a bootstrap server of my-kafka-cluster.example.com:9092.

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "kafka",
"name": "my_connection",
"bootstrapServers": "my-kafka-cluster.example.com:9092",
"topicName": "my_topic"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "kafka",
"name": "my_connection",
"bootstrapServers": "my-kafka-cluster.example.com:9092",
"topicName": "my_topic"
})

headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.post(url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
    "type": "kafka",
    "name": "my_connection",
    "bootstrapServers": "my-kafka-cluster.example.com:9092",
    "topicName": "my_topic"
}

Add your credentials to the connection

Send a PUT request to the /v2/connections/{name}/secrets endpoint to add key and secret credentials to the connection. Replace {name} in the path with the name of your connection in Polaris. For example, my_connection. Set the following properties in the payload:

  • type: Either sasl_plain or sasl_scram.
  • username: The SASL username for your Kafka cluster.
  • password: The SASL password for your Kafka cluster.
  • mechanism: Only for SASL SCRAM, the SCRAM mechanism. Either SCRAM-SHA-256 or SCRAM-SHA-512.

See the Connections v2 API documentation for a description of required parameters.

Sample request

The following example request adds a Kafka username and password to a connection named my_connection:

cURL
Python
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "sasl_plain",
"key": "$KAFKA_USERNAME",
"secret": "$KAFKA_PASSWORD"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "sasl_plain",
"key": "$KAFKA_USERNAME"),
"secret": "$KAFKA_PASSWORD")
})

headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.put(url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response.

Test a connection

You can send a POST request to the /v2/connections/{name}/test endpoint to test a connection.

If the connection fails, the response includes detailed information about the reason for the failure. Therefore, you can update the connection and try again. Use the /v2/connections/{name} and /v2/connections/{name}/secrets endpoints to update your connection. See the Connections v2 API documentation for details.

Start an ingestion job

After you successfully create a connection to Kafka, create an ingestion job to start ingesting data. Send a POST request to the /v2/jobs endpoint to create an ingestion job using your connection and details.

Set the connection information in the source object in the ingestion job payload. For example:

    "source": {
        "type": "connection",
        "connectionName": "my_connection",
        "formatSettings": {
            "format": "nd-json"
        },

Learn more

See the following topics for more information:

  • Create a table by API for information on creating and managing tables.
  • Connections v2 API for information on creating and managing connections.
  • Create an ingestion job by API for information on creating and managing ingestion jobs.
← Ingest from Amazon S3Ingest from Amazon Kinesis →
  • Prerequisites
  • Create a connection to Kafka
    • Sample request
    • Sample response
  • Add your credentials to the connection
    • Sample request
    • Sample response
  • Test a connection
  • Start an ingestion job
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc