Ingest events with the Kafka Connector for Imply Polaris
Imply provides a Kafka Connector for Imply Polaris (Kafka connector). You can use the Kafka Connector to publish your event stream to Polaris with the Polaris Events API.
The Kafka connector provides exactly once event ingestion during normal operation. During failure recovery scenarios, it provides at-least once event ingestion with the possibility of some duplicate rows.
This topic covers the installation and configuration instructions to run the Kafka connector. See Kafka connector configuration reference for a list of configuration options.
Project-less regional API resources have been deprecated and will be removed by the end of September 2024. See Migrate to project-scoped URL for more information.
How the Kafka connector works
To send events to Polaris, configure Kafka Connect to read data from a Kafka topic (source) and publish data to a Polaris streaming connection (sink).
The following diagram shows a sample configuration. The Kafka connector reads from three Kafka topics and pushes the events to two Polaris push streaming connections:
The Kafka connector polls Kafka for data. Before sending data to Polaris, it creates batches of events and compresses them. The default batch size is 500 KB before compression. For example, if the connector receives 2 MB of data from Kafka, it divides the data into four batches of 500 KB each before automatically compressing the data and sending it to Polaris. If the connector receives less than 500 KB from Kafka within a polling period, it sends the data to Polaris regardless.
The examples in this topic assume Kafka Connect is installed to /usr/local/kafka-connect/
.
They use "example" for the Polaris organization name.
Prerequisites
Before you set up the Kafka connector, familiarize yourself with the basics of Kafka Connect. See the following:
- Apache Kafka Connect docs
- Confluent's documentation for Kafka Connect
To set up the Kafka connector, you need the following:
A Polaris API key with the
ManageConnections
permission. To create an ingestion job, you need theManageIngestionJobs
permission. Use the API key to configure the HTTP header for the connector.To use OAuth authentication, see HTTP sink connector authorization.
A Push streaming source connection as a destination for the Kafka connector. The connection includes the Events API endpoint to send data to your table in Polaris. For example,
https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/kafka-connector
. For more information, see Push event data.Kafka version 2.6.3 or later with Kafka Connect running the same version. If you are running Kafka with a version prior to 2.6.3, you must deploy a new version of Kafka 2.6.3 or later.
Administrative access and configuration details for your Kafka cluster and the event stream topic to publish to Polaris.
Kafka Connect running either as a standalone process or as a distributed service. To bootstrap a deployment of a Confluent Kafka Connect, see CP-Kafka Connect Helm Chart.
Java 11 or higher
Supported input types
You can use the input data types supported by Kafka Connect converters.
Make sure your Kafka Connect instance is set up with the correct converters (key.converter
and value.converter
) for your data set.
Regardless of the input type, the Kafka connector converts the data to JSON before pushing it to the Events API
For more detail, see the following topics:
- Converters and Serialization Explained
- Configuring Key and Value Converters
- Using Kafka Connect with Schema Registry
Time column requirements and configuration
Polaris requires events to contain an ISO-formatted timestamp column. See Timestamp expressions for examples on transforming an existing timestamp or how to handle missing timestamps.
Configure Kafka Connect
The steps in this section describe how to configure a connectors
directory for Kafka Connect. If you already have Kafka Connect set up and are familiar with how Kafka Connect loads connectors, you can skip this section.
- Make a directory called
connectors
to store the connector. For example:mkdir -p /usr/local/kafka-connect/connectors
- Edit the Kafka Connect configuration file:
- For standalone mode:
/usr/local/kafka-connect/config/connect-standalone.properties
. - For distributed mode:
/usr/local/kafka-connect/config/connect-distributed.properties
.
- For standalone mode:
- Set the value for the plugin path to the directory containing the connector. For example:
plugin.path=/usr/local/kafka-connect/connectors/
- Verify that
bootstrap.servers
points to your Kafka host and port. Defaults tolocalhost:9092
.
For additional information, including documentation for distributed configuration properties, see Kafka Connect Configurations.
Install the Kafka connector
The steps in this section describe how to install the Kafka connector. If you run the connector in distributed mode, perform the steps for each Kafka Connect node.
- Navigate to the
connectors
directory. For example:cd /usr/local/kafka-connect/connectors
- Download the Kafka connector to the machines running Kafka Connect workers. For example:
curl -o polaris-kafka-connector-1.0.0.zip https://static.imply.io/support/polaris-kafka-connector-1.0.0.zip
- Extract the connector.After you finish installing the connector, proceed with configuration.
unzip polaris-kafka-connector-1.0.0.zip
Create a streaming ingestion job
Create a streaming ingestion job to ingest from your push streaming connection. You can add an ingestion job in the UI or using the API. See Start a job to ingest data for more detail.
Configure a standalone Kafka connector
It is possible to run the Kafka connector in standalone mode. This mode is good for learning to use the connector or testing the connector. It is not suitable for most production environments.
The distribution of the Kafka connector doesn't include a configuration file. This section helps you download and edit the connector configuration for a standalone instance.
Navigate to your Kafka Connect configuration directory. For example:
cd /usr/local/kafka-connect/config
Download the Kafka connector configuration file:
http-connect.config
. For example:curl -O https://docs.imply.io/polaris/assets/files/http-connect.config
Edit the following properties in the configuration file. The authentication properties listed here are for API key authentication. For OAuth properties, see HTTP sink connector authorization.
topics
: a list of one or more topics the Kafka topics the connector subscribes to. For example:topics: example_topic, wiki_edits
.http.url
: For a single topic, the Polaris streaming connector URL. For example,https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/kafka-connector
. To map one or more topics to a single Polaris streaming connector, or to map multiple topics, the base URL for the Events API. For example:https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/
.topic.endpoint.mapping
: A mapping of Kafka event streams to Polaris connector endpoints. Only use this when mapping multiple streams and endpoints. For example:wiki_edits: kafka-connector
. You cannot map a single topic to multiple endpoints.http.authorization.type
: Set tostatic
.http.headers.authorization
: Set toBasic API_KEY
. Substitute the value of your API key forAPI_KEY
.reporter.bootstrap.servers
: host and port for Kafka. Defaults tolocalhost:9092
.
Launch a standalone connector
Start the connector in standalone mode as follows:
- Navigate to the Kafka Connect directory. For example:
cd /usr/local/kafka-connect/
- Run the standalone connector as follows:
bin/connect-standalone.sh config/connect-standalone.properties config/http-connect.config
Kafka connect launches a standalone instance of the connector. The connector reads from the topic specified in the configuration and posts the event stream to the Polaris Events API.
When events ingest successfully, you can see them in the table details in the Polaris UI. See Monitor streaming for information about streaming errors and streaming health.
Configure distributed instances of the connector
To launch distributed instances of the connector, start the Kafka Connect worker. Then you can submit the configuration for connector to the workers with the REST API.
Navigate to the Kafka Connect directory. For example:
cd /usr/local/kafka-connect/
Verify the configurations in
config/connect-distributed.properties
. Check the Kafka host and port which defaults tobootstrap.servers=localhost:9092
. Confirm that theplugin.paths
includes the directory where your connectors are installed. For example:plugin.path=/usr/local/kafka-connect/connectors/
Launch the worker. For example:
bin/connect-distributed.sh config/connect-distributed.properties
Send a POST request containing the Kafka connector configuration to Kafka Connect. The configuration is similar to the standalone connector. The authentication properties listed here are for API key authentication. For OAuth properties, see HTTP sink connector authorization.
For example, where the worker is listening on the
localhost:8083
:curl --location --request POST "http://localhost:8083/connectors" \
--header "Content-Type: application/json" \
--data-raw '{"name" : "PolarisHttpSinkConnector",
"config" : {
"connector.class": "io.imply.kafka.connect.http.HttpSinkConnector",
"topics": "KAFKA_TOPICS",
"tasks.max": 1,
"http.url": "POLARIS_CONNECTION_URL",
"http.authorization.type": "static",
"http.headers.authorization": "Basic API_KEY",
"reporter.bootstrap.servers": "localhost:9092",
"reporter.result.topic.name": "success-responses",
"reporter.result.topic.replication.factor":1,
"reporter.error.topic.name": "error-responses",
"reporter.error.topic.replication.factor": 1
}
}'Configure the following properties in your HTTP POST request:
topics
: List of one or more Kafka topics for the connector to read from. For example,example_topic, wiki_edits
.tasks.max
: For production scenarios, increase the maximum number of connector tasks to the number of CPU cores available to the Kafka connector. For example, if you deploy the connector to 3 servers and each server has 8 CPU cores set this value to 24 (3x8) unless it exceeds the number of partitions. Each task is assigned to a thread. Each task is capable of handling a single Kafka partition.http.url
: For a single topic, the Polaris streaming connector URL. For example,https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECTS_ID/events/kafka-connector
To map one or more topics to a single Polaris streaming connector, or to map multiple topics, the base URL for the Events API. For example:https://example.us-east-1.aws.api.imply.io/v1/projects/PROJECT_ID/events/
.topic.endpoint.mapping
: A mapping of Kafka event streams to Polaris connector endpoints. A mapping of Kafka event streams to Polaris connector endpoints. Only use this when mapping multiple streams and endpoints. For example:wiki_edits: kafka-connector, wiki_updates:kafka-connector
. You cannot map a single topic to multiple endpoints.http.authorization.type
: Set tostatic
.http.headers.authorization
: Set toBasic API_KEY
. Substitute the value of your API key forAPI_KEY
.reporter.bootstrap.servers
: Host and port for Kafka. Defaults tolocalhost:9092
.
Kafka Connect launches a worker instance of the connector. The connector reads from the topic specified in the configuration and posts the event stream to the Polaris Events API.
Troubleshooting tips
The following are some ideas to help you troubleshoot issues with the Kafka connector.
Check the connector status
If you are running Kafka Connect in distributed mode, you can check the status of the connector using the Tasks
API. For example:
curl --location --request GET "http://localhost:8083/connectors/PolarisHttpSinkConnector/tasks/0/status"
Review the status for any errors. A running connector returns the following response:
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.4.23:8083"
}
See the Confluent documentation for the Tasks API.
Check for streaming errors in Polaris
Within Polaris, navigate to MONITORING > Streaming to view the Streaming dashboard. Look for the number of Unparseable events and for Events arrived too late.
If you have unparseable events, verify your ingestion job configuration to make sure your columns are defined and mapped properly. You can also check the ingestion job log.
If you see events arrived too late, it means your data is older than the allowed threshold. In this case consider exporting your data to a file and using batch ingestion. See Late arriving event data.
Check the ingestion job log
If you see unparseable events, check the ingestion job log for details as follows:
- From the table view click the ...(Actions) button and select View jobs.
- Find your streaming ingestion job and double-click it to open the Job details.
- Review the Log. Look for indications of misconfiguration. For example, if your
__time
column is incorrectly mapped, you may see "Error: Could not transform value for __time."
Learn more
See the following topics for more information:
- Push event data for instructions for the Events API
- Apache Kafka Connect docs for information about Kafka Connect.
- Confluent Kafka Connect docs for information about the Confluent distribution of Kafka Connect.