Ingest data from Apache Kafka by API
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Apache Kafka topics. Kafka connections support Amazon Managed Streaming for Apache Kafka (MSK).
This topic covers the process to create a connection and ingest data from Kafka using the Polaris API.
Prerequisites
Before you create a connection to ingest from Kafka, complete the following:
- 
Confirm that you have an active Apache Kafka instance that is reachable on the public internet or an Amazon MSK instance running in your AWS account.
 - 
Verify that your events are in a supported format and that the event timestamps are within 30 days of ingestion time. If you need to ingest events older than 30 days, configure
lateMessageRejectionPeriodaccordingly in your streaming ingestion job. - 
Review the required information to create a Kafka connection in Connect to Apache Kafka.
- If you use the Kafka connection for Amazon MSK, also review Connect to Amazon MSK.
 - If you use the Kafka connection for Kafka on Event Hubs, also review Connect to Apache Kafka on Azure Event Hubs.
 
 - 
If you don't have one already, create a Polaris API key with the
ManageConnectionspermission. If you plan to create tables or ingestion jobs, you also needManageTablesandManageIngestionJobs, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable namedPOLARIS_API_KEYto store the API key. 
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Kafka
Send a POST request to the /v1/projects/PROJECT_ID/connections endpoint to create a connection.
Set the following properties in the payload:
- 
type:kafka - 
name: An arbitrary descriptive name for your connection. - 
topicName: The name of the Kafka topic to read from. You can also provide a regular expression that identifies multiple Kafka topics for the connection. - 
topicNameIsPattern: Boolean property that determines whethertopicNameis interpreted as the exact topic name (default) or a regular expression to identify multiple topics. - 
bootstrapServers: A list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster, in the formhost1:port1,host2:port2,... - 
secrets: Method and details for authenticating the connection. For more information, see Connection authentication reference.- 
For SASL credentials, supply the following properties:
type: The SASL mechanism. Eithersasl_plainorsasl_scram.username: The SASL username for your Kafka cluster.password: The SASL password for your Kafka cluster.mechanism: Only for SASL SCRAM, the SCRAM mechanism. EitherSCRAM-SHA-256orSCRAM-SHA-512.
 - 
For MSK clusters that use IAM access control, provide the ARN of your IAM role that is authorized to access your MSK data. Supply the following properties:
type:aws_iamawsAssumedRoleArn: The ARN of the IAM role for Imply to assume.
 
 - 
 - 
clientRack: (optional) Kafka rack ID. If you've configured rack awareness in Kafka and you want Kafka to connect to a broker matching a specific rack, enter the rack ID in this property. See the Kafka documentation for information on balancing replicas across racks. - 
ssl: (optional) One or more server certificates for Polaris to trust, based on your networking setup. 
See the Prerequisites and the Connections v1 API documentation for a description of required parameters.
SASL credentials
The following example request creates a connection named my_connection to topic my_topic in Kafka with a bootstrap server of my-kafka-cluster.example.com:9092 and authenticated with SASL credentials. The request sets a clientRack ID and includes ssl server certificates to trust.
- cURL
 - Python
 
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
  "type": "kafka",
  "name": "my_connection",
  "topicName": "my_topic",
  "bootstrapServers": "my-kafka-cluster.example.com:9092",
  "secrets": {
    "type": "sasl_plain",
    "username": "$KAFKA_USERNAME",
    "password": "$KAFKA_PASSWORD"
  },
  "clientRack": "use1-az4",
  "ssl": {
    "truststore": {
      "type": "pem",
      "certificates": "-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----"
    }
  }
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
  "type": "kafka",
  "name": "my_connection",
  "topicName": "my_topic",
  "bootstrapServers": "my-kafka-cluster.example.com:9092",
  "secrets": {
    "type": "sasl_plain",
    "username": kafka_username,
    "password": kafka_password
  },
  "clientRack": "use1-az4",
  "ssl": {
    "truststore": {
      "type": "pem",
      "certificates": "-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----"
    }
  }
})
headers = {
  'Authorization': f'Basic {apikey}',
  'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK response and the details of the successful connection, for example:
Click to view the response
{
    "secrets": {
        "username": "$KAFKA_USERNAME",
        "type": "sasl_plain"
    },
    "bootstrapServers": "my-kafka-cluster.example.com:9092",
    "clientRack": "use1-az4",
    "ssl": {
        "truststore": {
            "certificates": "-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----",
            "type": "pem"
        }
    },
    "topicName": "my_topic",
    "modifiedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "modifiedOnTimestamp": "2023-08-22T20:26:21.101249Z",
    "name": "my_connection",
    "submittedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "submittedOnTimestamp": "2023-08-22T20:26:21.101249Z",
    "type": "kafka",
    "description": null
}
AWS IAM credentials
Kafka connections to Amazon MSK support authentication using IAM credentials.
The following example request creates a connection named my_connection with AWS IAM credentials:
- cURL
 - Python
 
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
  "type": "kafka",
  "name": "my_connection",
  "topicName": "my_topic",
  "bootstrapServers": "my-kafka-cluster.example.com:9092",
  "secrets": {
    "type": "aws_iam",
    "awsAssumedRoleArn": "arn:aws:iam::123456789012:role/msk_client_role"
  },
  "clientRack": "use1-az4",
  "ssl": {
    "truststore": {
      "type": "pem",
      "certificates": "-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----"
    }
  }
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
kafka_username = os.getenv("KAFKA_USERNAME")
kafka_password = os.getenv("KAFKA_PASSWORD")
payload = json.dumps({
  "type": "kafka",
  "name": "my_connection",
  "topicName": "my_topic",
  "bootstrapServers": "my-kafka-cluster.example.com:9092",
  "secrets": {
    "type": "aws_iam",
    "awsAssumedRoleArn": "arn:aws:iam::123456789012:role/msk_client_role"
  },
  "clientRack": "use1-az4",
  "ssl": {
    "truststore": {
      "type": "pem",
      "certificates": "-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----"
    }
  }
})
headers = {
  'Authorization': f'Basic {apikey}',
  'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK response and the details of the successful connection, for example:
Click to view the response
{
    "secrets": {
        "awsAssumedRoleArn": "arn:aws:iam::123456789012:role/msk_client_role",
        "type": "aws_iam"
    },
    "bootstrapServers": "my-kafka-cluster.example.com:9092",
    "clientRack": "use1-az4",
    "ssl": {
        "truststore": {
            "certificates": "-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----",
            "type": "pem"
        }
    },
    "topicName": "my_topic",
    "modifiedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "modifiedOnTimestamp": "2023-08-22T20:30:05.956192Z",
    "name": "my_connection",
    "submittedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
    },
    "submittedOnTimestamp": "2023-08-22T20:30:05.956192Z",
    "type": "kafka",
    "description": null
}
Test a connection
You can send a POST request to the /v1/projects/PROJECT_ID/connections/{name}/test endpoint to test a connection.
If the request fails, the response includes detailed information about the reason for the test failure. You can then update the connection and try again. Use the /v1/projects/PROJECT_ID/connections/{name} endpoint to update your connection.
See the Connections API documentation for details.
Start an ingestion job
After you successfully create a connection to Kafka, create an ingestion job to start ingesting data.
Send a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job using your connection and details.
Set the connection information in the source object in the ingestion job payload. For example:
"source": {
    "type": "connection",
    "connectionName": "my_connection",
    "formatSettings": {
        "format": "nd-json"
    }
}
See the Jobs v1 API documentation for a description of required parameters.
To also ingest event metadata including the Kafka key or headers, see Ingest Kafka records and metadata by API.
Learn more
See the following topics for more information:
- For reference on creating Kafka connections:
 - Create a streaming ingestion job for details on creating streaming ingestion jobs in Polaris.
 - Ingest Kafka records and metadata to ingest Kafka metadata along with message values.
 - Connections v1 API for information on creating and managing connections.