• Developer guide
  • API reference

›Ingestion sources

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Specify data schema
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Ingest events with the Kafka Connector for Imply Polaris

Imply provides a Kafka Connector for Imply Polaris (Kafka connector). You can use the Kafka Connector to publish your event stream to Polaris with the Polaris Events API.

The Kafka connector provides exactly once event ingestion during normal operation. During failure recovery scenarios, it provides at-least once event ingestion with the possibility of some duplicate rows.

This topic covers the installation and configuration instructions to run the Kafka connector. See Kafka connector configuration reference for a list of configuration options.

How the Kafka connector works

To send events to Polaris, configure Kafka Connect to read data from a Kafka topic (source) and publish data to a Polaris streaming connection (sink).

The following diagram shows a sample configuration. The Kafka connector reads from three Kafka topics and pushes the events to two Polaris push streaming connections:

Kafka connector diagram

The Kafka connector polls Kafka for data. Before sending data to Polaris, it creates batches of events and compresses them. The default batch size is 500 KB before compression. For example, if the connector receives 2 MB of data from Kafka, it divides the data into four batches of 500 KB each before automatically compressing the data and sending it to Polaris. If the connector receives less than 500 KB from Kafka within a polling period, it sends the data to Polaris regardless.

The examples in this topic assume Kafka Connect is installed to /usr/local/kafka-connect/. They use "example" for the Polaris organization name.

Prerequisites

Before you set up the Kafka connector, familiarize yourself with the basics of Kafka Connect. See the following:

  • Apache Kafka Connect docs
  • Confluent's documentation for Kafka Connect

To set up the Kafka connector, you need the following:

  • A Polaris OAuth client with the ManageConnections permission. To create an ingestion job, you need the ManageIngestionJobs permission. Use the client ID, the credentials, and your organization ID to configure the connector. See Authenticate with OAuth for details. For information on permissions, see Permissions reference.

  • A Push streaming source connection as a destination for the Kafka connector. The connection includes the Events API endpoint to send data to your table in Polaris. For example, https://example.us-east-1.aws.api.imply.io/v1/events/kafka-connector. For more information, see Push event data.

  • Kafka version 2.6.3 or later with Kafka Connect running the same version. If you are running Kafka with a version prior to 2.6.3, you must deploy a new version of Kafka 2.6.3 or later.

  • Administrative access and configuration details for your Kafka cluster and the event stream topic to publish to Polaris.

  • Kafka Connect running either as a standalone process or as a distributed service. To bootstrap a deployment of a Confluent Kafka Connect, see CP-Kafka Connect Helm Chart.

  • Java 11 or higher

Supported input types

You can use the input data types supported by Kafka Connect converters.

Make sure your Kafka Connect instance is set up with the correct converters (key.converter and value.converter) for your data set.

Regardless of the input type, the Kafka connector converts the data to JSON before pushing it to the Events API

For more detail, see the following topics:

  • Converters and Serialization Explained
  • Configuring Key and Value Converters
  • Using Kafka Connect with Schema Registry

Time column requirements and configuration

Polaris requires events to contain an ISO-formatted timestamp column. See Timestamp expressions for examples on transforming an existing timestamp or how to handle missing timestamps.

Configure Kafka Connect

The steps in this section describe how to configure a connectors directory for Kafka Connect. If you already have Kafka Connect set up and are familiar with how Kafka Connect loads connectors, you can skip this section.

  1. Make a directory called connectors to store the connector. For example:
    mkdir -p /usr/local/kafka-connect/connectors
    
  2. Edit the Kafka Connect configuration file:
    • For standalone mode: /usr/local/kafka-connect/config/connect-standalone.properties.
    • For distributed mode: /usr/local/kafka-connect/config/connect-distributed.properties.
  3. Set the value for the plugin path to the directory containing the connector. For example:
    plugin.path=/usr/local/kafka-connect/connectors/ 
    
  4. Verify that bootstrap.servers points to your Kafka host and port. Defaults to localhost:9092.

For additional information, including documentation for distributed configuration properties, see Kafka Connect Configurations.

Install the Kafka connector

The steps in this section describe how to install the Kafka connector. If you run the connector in distributed mode, perform the steps for each Kafka Connect node.

  1. Navigate to the connectors directory. For example:
    cd /usr/local/kafka-connect/connectors
    
  2. Download the Kafka connector to the machines running Kafka Connect workers. For example:
    curl -o polaris-kafka-connector-1.0.0.zip https://static.imply.io/support/polaris-kafka-connector-1.0.0.zip
    
  3. Extract the connector.
    unzip polaris-kafka-connector-1.0.0.zip
    

After you finish installing the connector, proceed with configuration.

Create a streaming ingestion job

Create a streaming ingestion job to ingest from your push streaming connection. You can add an ingestion job in the UI or using the API. See Start a job to ingest data for more detail.

Configure a standalone Kafka connector

It is possible to run the Kafka connector in standalone mode. This mode is good for learning to use the connector or testing the connector. It is not suitable for most production environments.

The distribution of the Kafka connector doesn't include a configuration file. This section helps you download and edit the connector configuration for a standalone instance.

  1. Navigate to your Kafka Connect configuration directory. For example:

    cd /usr/local/kafka-connect/config
    
  2. Download the Kafka connector configuration file: http-connect.config. For example:

    curl -O https://docs.imply.io/polaris/assets/files/http-connect.config
    
  3. Edit the following properties in the configuration file:

    • topics: a list of one or more topics the Kafka topics the connector subscribes to. For example: topics: example_topic, wiki_edits.
    • http.url: For a single topic, the Polaris streaming connector URL. For example, https://example.us-east-1.aws.api.imply.io/v1/events/kafka-connector. To map one or more topics to a single Polaris streaming connector, or to map multiple topics, the base URL for the Events API. For example: https://example.us-east-1.aws.api.imply.io/v1/events/.
    • topic.endpoint.mapping: A mapping of Kafka event streams to Polaris connector endpoints. Only use this when mapping multiple streams and endpoints. For example: wiki_edits: kafka-connector. You cannot map a single topic to multiple endpoints.
    • oauth2.access.token.url: the OAuth URL for Polaris. Replace ORGANIZATION_ID with your own organization ID.
    • oauth2.client.id: Polaris OAuth client ID.
    • oauth2.client.secret: Polaris OAuth client secret.
    • reporter.bootstrap.servers: host and port for Kafka. Defaults to localhost:9092.

Launch a standalone connector

Start the connector in standalone mode as follows:

  1. Navigate to the Kafka Connect directory. For example:
    cd /usr/local/kafka-connect/
    
  2. Run the standalone connector as follows:
    bin/connect-standalone.sh config/connect-standalone.properties config/http-connect.config
    

Kafka connect launches a standalone instance of the connector. The connector reads from the topic specified in the configuration and posts the event stream to the Polaris Events API.

When events ingest successfully, you can see them in the table details in the Polaris UI. See Monitor streaming for information about streaming errors and streaming health.

Configure distributed instances of the connector

To launch distributed instances of the connector, start the Kafka Connect worker. Then you can submit the configuration for connector to the workers with the REST API.

  1. Navigate to the Kafka Connect directory. For example:

    cd /usr/local/kafka-connect/
    
  2. Verify the configurations in config/connect-distributed.properties. Check the Kafka host and port which defaults to bootstrap.servers=localhost:9092. Confirm that the plugin.paths includes the directory where your connectors are installed. For example:

    plugin.path=/usr/local/kafka-connect/connectors/
    
  3. Launch the worker. For example:

    bin/connect-distributed.sh config/connect-distributed.properties
    
  4. Send a POST request containing the Kafka connector configuration to Kafka Connect. The configuration is similar to the standalone connector.

    For example, where the worker is listening on the localhost:8083:

    curl --location --request POST 'http://localhost:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data-raw '{"name" : "PolarisHttpSinkConnector",
    "config" : {
    "connector.class": "io.imply.kafka.connect.http.HttpSinkConnector",
    "topics": "KAFKA_TOPICS",
    "tasks.max": 1,
    "http.url": "POLARIS_CONNECTION_URL",
    "http.authorization.type": "oauth2",
    "oauth2.access.token.url": "https://id.imply.io/auth/realms/ORGANIATION_ID/protocol/openid-connect/token",
    "oauth2.client.id": "API_CLIENT_ID",
    "oauth2.client.secret": "API_CLIENT_SECRET",
    "oauth2.client.authorization.mode": "URL",
    "reporter.bootstrap.servers": "localhost:9092",
    "reporter.result.topic.name": "success-responses",
    "reporter.result.topic.replication.factor":1,
    "reporter.error.topic.name": "error-responses",
    "reporter.error.topic.replication.factor": 1
    }
    }'   
    

    Configure the following properties in your HTTP POST request:

    • topics: List of one or more Kafka topics for the connector to read from. For example, example_topic, wiki_edits.
    • tasks.max: For production scenarios, increase the maximum number of connector tasks to the number of CPU cores available to the Kafka connector. For example, if you deploy the connector to 3 servers and each server has 8 CPU cores set this value to 24 (3x8) unless it exceeds the number of partitions. Each task is assigned to a thread. Each task is capable of handling a single Kafka partition.
    • http.url: For a single topic, the Polaris streaming connector URL. For example, https://example.us-east-1.aws.api.imply.io/v1/events/kafka-connetor To map one or more topics to a single Polaris streaming connector, or to map multiple topics, the base URL for the Events API. For example: https://example.us-east-1.aws.api.imply.io/v1/events/.
    • topic.endpoint.mapping : A mapping of Kafka event streams to Polaris connector endpoints. A mapping of Kafka event streams to Polaris connector endpoints. Only use this when mapping multiple streams and endpoints. For example: wiki_edits: kafka-connector, wiki_updates:kafka-connector. You cannot map a single topic to multiple endpoints.
    • oauth2.access.token.url: OAuth URL for Polaris. Replace ORGANIZATION_ID with your own organization ID.
    • oauth2.client.id: Polaris OAuth client ID.
    • oauth2.client.secret: Polaris OAuth client secret.
    • reporter.bootstrap.servers: Host and port for Kafka. Defaults to localhost:9092.

Kafka Connect launches a worker instance of the connector. The connector reads from the topic specified in the configuration and posts the event stream to the Polaris Events API.

Troubleshooting tips

The following are some ideas to help you troubleshoot issues the Kafka connector.

Check the connector status

If you are running Kafka Connect in distributed mode, you can check the status of the connector using the Tasks API. For example:

curl --location --request GET 'http://localhost:8083/connectors/PolarisHttpSinkConnector/tasks/0/status' 

Review the status for any errors. A running connector returns the following response:

{
    "id": 0,
    "state": "RUNNING",
    "worker_id": "192.168.4.23:8083"
}

See the Confluent documentation for the Tasks API.

Check for streaming errors in Polaris

Within Polaris, navigate to MONITORING > Streaming to view the Streaming dashboard. Look for the number of Unparseable events and for Events arrived too late.

If you have unparseable events, verify your ingestion job configuration to make sure your columns are defined and mapped properly. You can also check the ingestion job log.

If you see events arrived too late, it means your data is older than seven days. In this case consider exporting your data to a file and using batch ingestion.

Check the ingestion job log

If you see unparseable events, check the ingestion job log for details as follows:

  1. From the table view click the ...(Actions) button and select View jobs.
  2. Find your streaming ingestion job and double-click it to open the Job details.
  3. Review the Log. Look for indications of misconfiguration. For example, if your __time column is incorrectly mapped, you may see "Error: Could not transform value for __time."

Learn more

See the following topics for more information:

  • Push event data for instructions for the Events API
  • Apache Kafka Connect docs for information about Kafka Connect.
  • Confluent Kafka Connect docs for information about Confluent's distribution of Kafka Connect.
← Push event dataKafka Connector reference →
  • How the Kafka connector works
  • Prerequisites
    • Supported input types
    • Time column requirements and configuration
  • Configure Kafka Connect
  • Install the Kafka connector
  • Create a streaming ingestion job
  • Configure a standalone Kafka connector
  • Launch a standalone connector
  • Configure distributed instances of the connector
  • Troubleshooting tips
    • Check the connector status
    • Check for streaming errors in Polaris
    • Check the ingestion job log
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc