Ingest data from Apache Kafka and Amazon MSK by API
You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Apache Kafka streams, including from Amazon Managed Streaming for Apache Kafka (MSK). Amazon MSK is a fully managed, cloud-native service for Apache Kafka.
This topic covers the process to create a connection to Kafka or MSK.
Prerequisites
This topic assumes you have an active Apache Kafka instance that is reachable on the public internet or an Amazon MSK instance running in your AWS account.
To set up ingestion from Kafka, you need:
- Apache Kafka events to ingest into Polaris. See Supported data and file formats for the supported data formats for streaming ingestion.
Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. You can use batch file ingestion if you want to ingest older data. - The name of the Kafka topic that contains the event data.
- Bootstrap server information: a list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster. This list should be in the form
host1:port1,host2:port2,...
For details on where to find the bootstrap server in Amazon MSK, see Getting the bootstrap brokers using the AWS Management Console. - Apache Kafka username and password for Polaris to use to make the connection. Polaris supports Simple Authentication Security Layer (SASL) PLAIN and SASL SCRAM.
For SASL SCRAM connections, you must also provide the SCRAM mechanism, either
SCRAM-SHA-256
orSCRAM-SHA-512
.Polaris does not currently support IAM authentication for Amazon MSK.
From the Polaris side, you need the following:
- A Polaris API key with
ManageConnections
permissions. If you are creating tables or ingestion jobs, you needManageTables
andManageIngestionJobs
respectively. The examples in this topic use a variable namedPOLARIS_API_KEY
to store the API key. - A destination table to store events from Apache Kafka.
Create a connection to Kafka
Each connection is associated with a single source of data. If you plan to ingest data from multiple topics, create a new connection for each one.
Send a POST
request to the /v2/connections
endpoint to create a connection.
Set the following properties in the payload:
type
:kafka
name
: an arbitrary descriptive name for your connectionbootstrapServers
: the list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster in the formhost1:port1,host2:port2,...
topicName
: the name of the Kafka topic to read from
See the Prerequisites and the Connections v2 API documentation for a description of required parameters.
Sample request
The following example request creates a connection named my_connection
to topic my_topic
in Kafka with a bootstrap server of my-kafka-cluster.example.com:9092
.
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "kafka",
"name": "my_connection",
"bootstrapServers": "my-kafka-cluster.example.com:9092",
"topicName": "my_topic"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "kafka",
"name": "my_connection",
"bootstrapServers": "my-kafka-cluster.example.com:9092",
"topicName": "my_topic"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "kafka",
"name": "my_connection",
"bootstrapServers": "my-kafka-cluster.example.com:9092",
"topicName": "my_topic"
}
Add your credentials to the connection
Send a PUT
request to the /v2/connections/{name}/secrets
endpoint to add key and secret credentials to the connection.
Replace {name}
in the path with the name of your connection in Polaris. For example, my_connection
.
Set the following properties in the payload:
type
: Eithersasl_plain
orsasl_scram
.username
: The SASL username for your Kafka cluster.password
: The SASL password for your Kafka cluster.mechanism
: Only for SASL SCRAM, the SCRAM mechanism. EitherSCRAM-SHA-256
orSCRAM-SHA-512
.
See the Connections v2 API documentation for a description of required parameters.
Sample request
The following example request adds a Kafka username and password to a connection named my_connection
:
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "sasl_plain",
"key": "$KAFKA_USERNAME",
"secret": "$KAFKA_PASSWORD"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "sasl_plain",
"key": "$KAFKA_USERNAME"),
"secret": "$KAFKA_PASSWORD")
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.put(url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response.
Test a connection
You can send a POST
request to the /v2/connections/{name}/test
endpoint to test a connection.
If the connection fails, the response includes detailed information about the reason for the failure. Therefore, you can update the connection and try again. Use the /v2/connections/{name}
and /v2/connections/{name}/secrets
endpoints to update your connection. See the Connections v2 API documentation for details.
Start an ingestion job
After you successfully create a connection to Kafka, create an ingestion job to start ingesting data.
Send a POST
request to the /v2/jobs
endpoint to create an ingestion job using your connection and details.
Set the connection information in the source
object in the ingestion job payload. For example:
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
Learn more
See the following topics for more information:
- Create a table by API for information on creating and managing tables.
- Connections v2 API for information on creating and managing connections.
- Create an ingestion job by API for information on creating and managing ingestion jobs.