Ingest data from Confluent Cloud by API
Project-less regional API resources have been deprecated and will be removed by the end of September 2024.
You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023.
For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID
Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Confluent Cloud. Confluent Cloud is a fully managed, cloud-native service for Apache Kafka.
This topic covers the process you need to follow to create a connection to Confluent Cloud, then create a job to ingest data from Confluent Cloud into a Polaris table. For information on how to create connections in the Polaris UI, see Create a connection. For reference on Confluent Cloud connections in Polaris, see Ingest from Confluent Cloud.
Prerequisites
This topic assumes you have an active Confluent Cloud account. You will need the following before you start to set up ingestion from Confluent Cloud:
Topic name. The name of the Confluent Cloud topic that contains the event data. You can also provide a regular expression that identifies multiple Kafka topics for the connection.
Apache Kafka events to ingest into Polaris. See Supported data and file formats for the supported data formats for streaming ingestion.
Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. You can use batch file ingestion if you want to ingest older data.Bootstrap server information. A list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster. This list should be in the form
host1:port1,host2:port2,...
For details on where to find the bootstrap server in Confluent Cloud, see Access cluster settings in the Confluent Cloud Console.
You can also use the Confluent Cloud API to read a cluster and find its bootstrap server information in thekafka_bootstrap_endpoint
of the cluster object.Authentication details. A Confluent Cloud API key and secret that Polaris will use to make the connection. Note that resource-specific API keys in Confluent Cloud require an access control list that restricts access. See Use API keys to control access for more information.
From the Polaris side, you will also need an API key with the following permissions:
ManageTables
ManageConnections
ManageIngestionJobs
In the examples below, the key value is stored in the variable named POLARIS_API_KEY
.
See Authenticate with API keys to obtain an API key and assign service account permissions.
For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Confluent Cloud
Send a POST
request to the /v1/projects/PROJECT_ID/connections
endpoint to create a connection.
See the Prerequisites and the Connections v1 API documentation for a description of required parameters.
Sample request
The following example request creates a connection named my_connection
to topic my_topic
in Confluent Cloud.
The environment variables $CONFLUENT_KEY
and $CONFLUENT_SECRET
store the Confluent Cloud API key and secret, respectively.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic",
"secrets": {
"type": "sasl_plain",
"username": "$CONFLUENT_KEY",
"password": "$CONFLUENT_SECRET"
}
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
confluent_key = os.getenv("CONFLUENT_KEY")
confluent_secret = os.getenv("CONFLUENT_SECRET")
payload = json.dumps({
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic",
"secrets": {
"type": "sasl_plain",
"username": confluent_key,
"password": confluent_secret
}
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the connection, for example:
{
"secrets": {
"username": "$CONFLUENT_KEY",
"type": "sasl_plain"
},
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic",
"type": "confluent",
"name": "my_connection",
"description": null,
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2023-03-28T17:35:27.305019Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2023-03-28T17:35:27.305019Z"
}
Test a connection
You can send a POST
request to the /v1/projects/PROJECT_ID/connections/{name}/test
endpoint to test a connection.
If the request fails, the response includes detailed information about the reason for the test failure. You can then update the connection and try again. Use the /v1/projects/PROJECT_ID/connections/{name}
endpoint to update your connection.
See the Connections v1 API documentation for details.
Start an ingestion job
Send a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a streaming ingestion job using your connection.
You can create multiple ingestion jobs with the same connection—for example, to ingest event data from a topic into both a detail table and an aggregate table.
Use schema auto-discovery for your streaming ingestion job ("useSchemaDiscovery": true
) for Polaris to automatically detect the schema of the input data and map them directly to columns in the destination table.
This allows you to skip defining each column in the input schema and each relationship in the mappings.
The ingestion job spec in this example enables schema auto-discovery but defines the requisite input fields and mapping for the primary timestamp, __time
.
Since date
and time
are used to generate the primary timestamp,
the job spec excludes them from being created as their own columns by adding them to dimensionExclusions
.
See the Jobs v1 API documentation for a description of required parameters.
Sample request
The following example request creates an ingestion job for the sample-confluent-ingestion
table using the my_connection
connection.
Use source type connection
for an ingestion job that connects to Confluent Cloud.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data '{
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
},
"createTableIfNotExists": true,
"useSchemaDiscovery": true,
"dimensionExclusions": [
"date",
"time"
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
},
"createTableIfNotExists": True,
"useSchemaDiscovery": True,
"dimensionExclusions": [
"date",
"time"
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job.
Click to view the response
{
"source": {
"connectionName": "my_connection",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
}
],
"dimensionExclusions": [
"date",
"time"
],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-12T22:58:56.158957343Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a8b9e-841e-7630-a722-b611d3821562",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-12T22:58:56.158957343Z",
"spec": {
"source": {
"connectionName": "my_connection",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
],
"type": "connection"
},
"target": {
"tableName": "sample-confluent-ingestion",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"dimensionExclusions": [
"date",
"time"
],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": true,
"replaceRunning": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "sample-confluent-ingestion",
"type": "table",
"intervals": []
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Add data to your topic
Once the ingestion job is running, add event data to your Confluent Cloud topic. For example, the following example is a single message produced to the topic my_topic
. With schema auto-discovery, when subsequent messages include new properties, Polaris automatically detects them and ingests them as dimension columns in the table.
{
"date": "2023-09-01",
"time": "02:47:05.474Z",
"country": "australia",
"city": "darwin"
}
Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. Use batch file ingestion if you want to ingest older data.
The new data is ingested into the Polaris table you specified in the job.
For details on how to monitor or cancel the ingestion job, see View and manage ingestion jobs by API.
Learn more
See the following topics for more information:
- Ingest from Confluent Cloud for reference on connecting from Confluent Cloud to Polaris.
- Create a streaming ingestion job for details on creating streaming ingestion jobs in Polaris.
- Connections v1 API for information on creating and managing connections.
- Jobs v1 API for information on creating and managing ingestion jobs.