Skip to main content

Ingest events with the Kafka Connector for Imply Polaris

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024.

You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023. For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID

For backward compatibility, you can continue to use project-less regional API resources on projects created prior to September 29, 2023. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.

Imply provides a Kafka Connector for Imply Polaris (Kafka connector). You can use the Kafka Connector to publish your event stream to Polaris with the Polaris Events API.

The Kafka connector provides exactly once event ingestion during normal operation. During failure recovery scenarios, it provides at-least once event ingestion with the possibility of some duplicate rows.

This topic covers the installation and configuration instructions to run the Kafka connector. See Kafka connector configuration reference for a list of configuration options.

How the Kafka connector works

To send events to Polaris, configure Kafka Connect to read data from a Kafka topic (source) and publish data to a Polaris streaming connection (sink).

The following diagram shows a sample configuration. The Kafka connector reads from three Kafka topics and pushes the events to two Polaris push streaming connections:

Kafka connector diagram

The Kafka connector polls Kafka for data. Before sending data to Polaris, it creates batches of events and compresses them. The default batch size is 500 KB before compression. For example, if the connector receives 2 MB of data from Kafka, it divides the data into four batches of 500 KB each before automatically compressing the data and sending it to Polaris. If the connector receives less than 500 KB from Kafka within a polling period, it sends the data to Polaris regardless.

The examples in this topic assume Kafka Connect is installed to /usr/local/kafka-connect/. They use "example" for the Polaris organization name.

Prerequisites

Before you set up the Kafka connector, familiarize yourself with the basics of Kafka Connect. See the following:

To set up the Kafka connector, you need the following:

  • A Polaris API key with the ManageConnections permission. To create an ingestion job, you need the ManageIngestionJobs permission. Use the API key to configure the HTTP header for the connector.

    To use OAuth authentication, see HTTP sink connector authorization.

  • A Push streaming source connection as a destination for the Kafka connector. The connection includes the Events API endpoint to send data to your table in Polaris. For example, https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/kafka-connector. For more information, see Push event data.

  • Kafka version 2.6.3 or later with Kafka Connect running the same version. If you are running Kafka with a version prior to 2.6.3, you must deploy a new version of Kafka 2.6.3 or later.

  • Administrative access and configuration details for your Kafka cluster and the event stream topic to publish to Polaris.

  • Kafka Connect running either as a standalone process or as a distributed service. To bootstrap a deployment of a Confluent Kafka Connect, see CP-Kafka Connect Helm Chart.

  • Java 11 or higher

Supported input types

You can use the input data types supported by Kafka Connect converters.

Make sure your Kafka Connect instance is set up with the correct converters (key.converter and value.converter) for your data set.

Regardless of the input type, the Kafka connector converts the data to JSON before pushing it to the Events API

For more detail, see the following topics:

Time column requirements and configuration

Polaris requires events to contain an ISO-formatted timestamp column. See Timestamp expressions for examples on transforming an existing timestamp or how to handle missing timestamps.

Configure Kafka Connect

The steps in this section describe how to configure a connectors directory for Kafka Connect. If you already have Kafka Connect set up and are familiar with how Kafka Connect loads connectors, you can skip this section.

  1. Make a directory called connectors to store the connector. For example:
    mkdir -p /usr/local/kafka-connect/connectors
  2. Edit the Kafka Connect configuration file:
    • For standalone mode: /usr/local/kafka-connect/config/connect-standalone.properties.
    • For distributed mode: /usr/local/kafka-connect/config/connect-distributed.properties.
  3. Set the value for the plugin path to the directory containing the connector. For example:
    plugin.path=/usr/local/kafka-connect/connectors/ 
  4. Verify that bootstrap.servers points to your Kafka host and port. Defaults to localhost:9092.

For additional information, including documentation for distributed configuration properties, see Kafka Connect Configurations.

Install the Kafka connector

The steps in this section describe how to install the Kafka connector. If you run the connector in distributed mode, perform the steps for each Kafka Connect node.

  1. Navigate to the connectors directory. For example:
    cd /usr/local/kafka-connect/connectors
  2. Download the Kafka connector to the machines running Kafka Connect workers. For example:
    curl -o polaris-kafka-connector-1.0.0.zip https://static.imply.io/support/polaris-kafka-connector-1.0.0.zip
  3. Extract the connector.
    unzip polaris-kafka-connector-1.0.0.zip
    After you finish installing the connector, proceed with configuration.

Create a streaming ingestion job

Create a streaming ingestion job to ingest from your push streaming connection. You can add an ingestion job in the UI or using the API. See Start a job to ingest data for more detail.

Configure a standalone Kafka connector

It is possible to run the Kafka connector in standalone mode. This mode is good for learning to use the connector or testing the connector. It is not suitable for most production environments.

The distribution of the Kafka connector doesn't include a configuration file. This section helps you download and edit the connector configuration for a standalone instance.

  1. Navigate to your Kafka Connect configuration directory. For example:

    cd /usr/local/kafka-connect/config
  2. Download the Kafka connector configuration file: http-connect.config. For example:

    curl -O https://docs.imply.io/polaris/assets/files/http-connect.config
  3. Edit the following properties in the configuration file. The authentication properties listed here are for API key authentication. For OAuth properties, see HTTP sink connector authorization.

    • topics: a list of one or more topics the Kafka topics the connector subscribes to. For example: topics: example_topic, wiki_edits.
    • http.url: For a single topic, the Polaris streaming connector URL. For example, https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/kafka-connector. To map one or more topics to a single Polaris streaming connector, or to map multiple topics, the base URL for the Events API. For example: https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/.
    • topic.endpoint.mapping: A mapping of Kafka event streams to Polaris connector endpoints. Only use this when mapping multiple streams and endpoints. For example: wiki_edits: kafka-connector. You cannot map a single topic to multiple endpoints.
    • http.authorization.type: Set to static.
    • http.headers.authorization: Set to Basic API_KEY. Substitute the value of your API key for API_KEY.
    • reporter.bootstrap.servers: host and port for Kafka. Defaults to localhost:9092.

Launch a standalone connector

Start the connector in standalone mode as follows:

  1. Navigate to the Kafka Connect directory. For example:
    cd /usr/local/kafka-connect/
  2. Run the standalone connector as follows:
    bin/connect-standalone.sh config/connect-standalone.properties config/http-connect.config

Kafka connect launches a standalone instance of the connector. The connector reads from the topic specified in the configuration and posts the event stream to the Polaris Events API.

When events ingest successfully, you can see them in the table details in the Polaris UI. See Monitor streaming for information about streaming errors and streaming health.

Configure distributed instances of the connector

To launch distributed instances of the connector, start the Kafka Connect worker. Then you can submit the configuration for connector to the workers with the REST API.

  1. Navigate to the Kafka Connect directory. For example:

    cd /usr/local/kafka-connect/
  2. Verify the configurations in config/connect-distributed.properties. Check the Kafka host and port which defaults to bootstrap.servers=localhost:9092. Confirm that the plugin.paths includes the directory where your connectors are installed. For example:

    plugin.path=/usr/local/kafka-connect/connectors/
  3. Launch the worker. For example:

    bin/connect-distributed.sh config/connect-distributed.properties
  4. Send a POST request containing the Kafka connector configuration to Kafka Connect. The configuration is similar to the standalone connector. The authentication properties listed here are for API key authentication. For OAuth properties, see HTTP sink connector authorization.

    For example, where the worker is listening on the localhost:8083:

    curl --location --request POST "http://localhost:8083/connectors" \
    --header "Content-Type: application/json" \
    --data-raw '{"name" : "PolarisHttpSinkConnector",
    "config" : {
    "connector.class": "io.imply.kafka.connect.http.HttpSinkConnector",
    "topics": "KAFKA_TOPICS",
    "tasks.max": 1,
    "http.url": "POLARIS_CONNECTION_URL",
    "http.authorization.type": "static",
    "http.headers.authorization": "Basic API_KEY",
    "reporter.bootstrap.servers": "localhost:9092",
    "reporter.result.topic.name": "success-responses",
    "reporter.result.topic.replication.factor":1,
    "reporter.error.topic.name": "error-responses",
    "reporter.error.topic.replication.factor": 1
    }
    }'

    Configure the following properties in your HTTP POST request:

    • topics: List of one or more Kafka topics for the connector to read from. For example, example_topic, wiki_edits.
    • tasks.max: For production scenarios, increase the maximum number of connector tasks to the number of CPU cores available to the Kafka connector. For example, if you deploy the connector to 3 servers and each server has 8 CPU cores set this value to 24 (3x8) unless it exceeds the number of partitions. Each task is assigned to a thread. Each task is capable of handling a single Kafka partition.
    • http.url: For a single topic, the Polaris streaming connector URL. For example, https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECTS_ID/events/kafka-connector To map one or more topics to a single Polaris streaming connector, or to map multiple topics, the base URL for the Events API. For example: https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/.
    • topic.endpoint.mapping : A mapping of Kafka event streams to Polaris connector endpoints. A mapping of Kafka event streams to Polaris connector endpoints. Only use this when mapping multiple streams and endpoints. For example: wiki_edits: kafka-connector, wiki_updates:kafka-connector. You cannot map a single topic to multiple endpoints.
    • http.authorization.type: Set to static.
    • http.headers.authorization: Set to Basic API_KEY. Substitute the value of your API key for API_KEY.
    • reporter.bootstrap.servers: Host and port for Kafka. Defaults to localhost:9092.

Kafka Connect launches a worker instance of the connector. The connector reads from the topic specified in the configuration and posts the event stream to the Polaris Events API.

Troubleshooting tips

The following are some ideas to help you troubleshoot issues with the Kafka connector.

Check the connector status

If you are running Kafka Connect in distributed mode, you can check the status of the connector using the Tasks API. For example:

curl --location --request GET "http://localhost:8083/connectors/PolarisHttpSinkConnector/tasks/0/status" 

Review the status for any errors. A running connector returns the following response:

{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.4.23:8083"
}

See the Confluent documentation for the Tasks API.

Check for streaming errors in Polaris

Within Polaris, navigate to MONITORING > Streaming to view the Streaming dashboard. Look for the number of Unparseable events and for Events arrived too late.

If you have unparseable events, verify your ingestion job configuration to make sure your columns are defined and mapped properly. You can also check the ingestion job log.

If you see events arrived too late, it means your data is older than the allowed threshold. In this case consider exporting your data to a file and using batch ingestion. See Late arriving event data.

Check the ingestion job log

If you see unparseable events, check the ingestion job log for details as follows:

  1. From the table view click the ...(Actions) button and select View jobs.
  2. Find your streaming ingestion job and double-click it to open the Job details.
  3. Review the Log. Look for indications of misconfiguration. For example, if your __time column is incorrectly mapped, you may see "Error: Could not transform value for __time."

Learn more

See the following topics for more information:

  • Push event data for instructions for the Events API
  • Apache Kafka Connect docs for information about Kafka Connect.
  • Confluent Kafka Connect docs for information about Confluent's distribution of Kafka Connect.