Ingest data from Confluent Cloud by API
You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Confluent Cloud. Confluent Cloud is a fully managed, cloud-native service for Apache Kafka.
This topic covers the process you need to follow to create a connection to Confluent Cloud, create a Polaris table to receive the data, then create a job to ingest data from the source into the table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
Prerequisites
This topic assumes you have an active Confluent Cloud account. You will need the following before you start to set up ingestion from Confluent Cloud:
- Apache Kafka events to ingest into Polaris. See Supported data and file formats for the supported data formats for streaming ingestion.
Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. You can use batch file ingestion if you want to ingest older data. - The name of the Confluent Cloud topic that contains the event data.
- Bootstrap server information: a list of one or more host and port pairs representing the addresses of brokers in the Kafka cluster. This list should be in the form
host1:port1,host2:port2,...
For details on where to find the bootstrap server in Confluent Cloud, see Access cluster settings in the Confluent Cloud Console.
You can also use the Confluent Cloud API to read a cluster and find its bootstrap server information in thekafka_bootstrap_endpoint
of the cluster object. - A Confluent Cloud API key and secret that Polaris will use to make the connection. Note that resource-specific API keys in Confluent Cloud require an access control list that restricts access. See Use API keys to control access for more information.
You will also need an API key with the following permissions:
ManageTables
ManageConnections
ManageIngestionJobs
In the examples below, the key value is stored in the variable named POLARIS_API_KEY
.
See Authenticate with API keys to obtain an API key and assign service account permissions.
For more information on permissions, visit Permissions reference.
Create a table
Create a new table to receive the ingested data, or locate the details of an existing table.
The following sample request creates the sample-confluent-ingestion
table using the Tables v2 API.
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "detail",
"name": "sample-confluent-ingestion",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "country",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "detail",
"name": "sample-confluent-ingestion",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "country",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Create a connection to Confluent Cloud
Each connection is associated with a single source of data. If you plan to ingest data from multiple topics, create a new connection for each one.
Send a POST
request to the /v2/connections
endpoint to create a connection.
See the Prerequisites and the Connections v2 API documentation for a description of required parameters.
Sample request
The following example request creates a connection named my_connection
to topic my_topic
in Confluent Cloud:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic"
}
Add your key and secret to the connection
Send a PUT
request to the /v2/connections/{name}/secrets
endpoint to add key and secret credentials to the connection.
Replace {name}
in the path with the name of your connection in Polaris.
See the Connections v2 API documentation for a description of required parameters.
Sample request
The following example request adds a Confluent Cloud key and secret to a connection named my_connection
:
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "confluent",
"key": "$CONFLUENT_KEY",
"secret": "$CONFLUENT_SECRET"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/my_connection/secrets"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "confluent",
"key": "$CONFLUENT_KEY",
"secret": "$CONFLUENT_SECRET"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("PUT", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response.
Test a connection
You can send a POST
request to the /v2/connections/{name}/test
endpoint to test a connection.
If the connection fails, the response includes detailed information about the reason for the failure so you can update the connection and try again. Use the /v2/connections/{name}
and /v2/connections/{name}/secrets
endpoints to update your connection. See the Connections v2 API documentation for details.
Start an ingestion job
Send a POST
request to the /v2/jobs
endpoint to create an ingestion job using your connection and table details.
See the Job v2 API documentation for a description of required parameters. You can include the optional readFromPoint
property in the job request to instruct Polaris to read data from the earliest or latest point available in the stream.
You can create multiple ingestion jobs with the same connection—for example, to ingest event data from a topic into both a detail table and an aggregate table.
Sample request
The following example request creates an ingestion job for the sample-confluent-ingestion
table using the my_connection
connection.
Use source type
connection
for an ingestion job that connects to Confluent Cloud.
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
}
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
}
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job.
Click to view the response
{
"type": "streaming",
"id": "ad837s4k-4633-6g56-8e23-7384kd905273",
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "my_username",
"userID": "12345-1a3b-36s9-7e23-277g7899c223"
},
"lastModifiedBy": {
"username": "my_username",
"userID": "12345-1a3b-36s9-7e23-277g7899c223"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-07-28T12:52:06.365276805Z",
"lastUpdatedTimestamp": "2022-07-28T12:52:06.365276805Z",
"source": {
"type": "connection",
"connectionName": "my_connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
]
}
</ details>
Add data to your topic
Once the ingestion job is running, add event data to your Confluent Cloud topic. For example, the following example is a single message produced to the topic my_topic
. Note that the data structure matches the input schema in the ingestion job.
{
"date": "2022-08-01",
"time": "02:47:05.474Z",
"country": "australia",
"city": "darwin"
}
Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. Use batch file ingestion if you want to ingest older data.
The new data is ingested into the Polaris table you specified in the job.
Cancel a job
To cancel an ingestion job, follow these steps:
- Send a
GET
request to the/v2/jobs
endpoint to find the job ID. - Send a
PUT
request with the job ID to the/v2/jobs/{jobId}
endpoint and set thedesiredExecutionStatus
tocanceled
.
See the Jobs v2 API documentation for a description of required parameters.
Sample request
The following request is an example of step 2. It cancels a job for the specified job ID.
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/gs65dfe6-32f5-7s3k-n5sd-gre007e74422' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"desiredExecutionStatus": "canceled"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/gs65dfe6-32f5-7s3k-n5sd-gre007e74422"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"desiredExecutionStatus": "canceled"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("PUT", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the canceled job.
Monitor an ingestion job
See Monitor streaming ingestion from a connection for information on how to monitor the status of your ingestion job in the Polaris UI.
Known limitations
We're regularly updating Polaris to add features and fixes. If you encounter an issue, check the Known limitations page.
Learn more
See the following topics for more information:
- Ingest from Confluent Cloud for reference on connecting from Confluent Cloud to Polaris.
- Tables v2 API for information on creating and managing tables.
- Connections v2 API for information on creating and managing connections.
- Jobs v2 API for information on creating and managing ingestion jobs.