Skip to main content

Ingest data from Confluent Cloud by API

You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Confluent Cloud. Confluent Cloud is a fully managed, cloud-native service for Apache Kafka.

This topic covers the process you need to follow to create a connection to Confluent Cloud, then create a job to ingest data from Confluent Cloud into a Polaris table. For information on how to create connections in the Polaris UI, see Create a connection.

Prerequisites

Before you create a connection to ingest from Confluent Cloud, complete the following:

  • Confirm that you have an active Confluent Cloud account and topic.

  • Verify that your events are in a supported format and that the event timestamps are within 30 days of ingestion time. If you need to ingest events older than 30 days, configure lateMessageRejectionPeriod accordingly in your streaming ingestion job.

  • Review Connect to Confluent Cloud for the required information to create the connection.

  • If you don't have one already, create a Polaris API key with the ManageConnections permission. If you plan to create tables or ingestion jobs, you also need ManageTables and ManageIngestionJobs, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable named POLARIS_API_KEY to store the API key.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Create a connection to Confluent Cloud

Send a POST request to the /v1/projects/PROJECT_ID/connections endpoint to create a connection.

You can create a connection per topic, or provide a regular expression for topicName to match multiple Kafka topics for the connection. An ingestion job from a multi-topic connection ingests the data from all matched topics into the destination table.

See the Prerequisites and the Connections v1 API documentation for a description of required parameters.

Sample request

The following example request creates a connection named my_connection to topic my_topic in Confluent Cloud.

The environment variables $CONFLUENT_KEY and $CONFLUENT_SECRET store the Confluent Cloud API key and secret, respectively.

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "confluent",
"name": "my_connection",
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic",
"secrets": {
"type": "sasl_plain",
"username": "$CONFLUENT_KEY",
"password": "$CONFLUENT_SECRET"
}
}'

Sample response

A successful request returns a 200 OK response and the details of the connection, for example:

{
"secrets": {
"username": "$CONFLUENT_KEY",
"type": "sasl_plain"
},
"bootstrapServers": "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092",
"topicName": "my_topic",
"type": "confluent",
"name": "my_connection",
"description": null,
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2023-03-28T17:35:27.305019Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2023-03-28T17:35:27.305019Z"
}

Test a connection

You can send a POST request to the /v1/projects/PROJECT_ID/connections/{name}/test endpoint to test a connection.

If the request fails, the response includes detailed information about the reason for the test failure. You can then update the connection and try again. Use the /v1/projects/PROJECT_ID/connections/{name} endpoint to update your connection. See the Connections v1 API documentation for details.

Start an ingestion job

Send a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job using your connection. You can create multiple ingestion jobs with the same connectionfor example, to ingest event data from a topic into both a detail table and an aggregate table.

Use schema auto-discovery for your streaming ingestion job ("useSchemaDiscovery": true) for Polaris to automatically detect the schema of the input data and map them directly to columns in the destination table. This allows you to skip defining each column in the input schema and each relationship in the mappings.

The ingestion job spec in this example enables schema auto-discovery but defines the requisite input fields and mapping for the primary timestamp, __time. Since date and time are used to generate the primary timestamp, the job spec excludes them from being created as their own columns by adding them to dimensionExclusions.

See the Jobs v1 API documentation for a description of required parameters.

Sample request

The following example request creates an ingestion job for the sample-confluent-ingestion table using the my_connection connection.

info

Use source type connection for an ingestion job that connects to Confluent Cloud.

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
"type": "streaming",
"source": {
"type": "connection",
"connectionName": "my_connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
}
],
"target": {
"type": "table",
"tableName": "sample-confluent-ingestion"
},
"createTableIfNotExists": true,
"useSchemaDiscovery": true,
"dimensionExclusions": [
"date",
"time"
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job.

Click to view the response
{
"source": {
"connectionName": "my_connection",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
}
],
"dimensionExclusions": [
"date",
"time"
],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-12T22:58:56.158957343Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a8b9e-841e-7630-a722-b611d3821562",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-12T22:58:56.158957343Z",
"spec": {
"source": {
"connectionName": "my_connection",
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
}
],
"type": "connection"
},
"target": {
"tableName": "sample-confluent-ingestion",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"dimensionExclusions": [
"date",
"time"
],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": true,
"replaceRunning": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "sample-confluent-ingestion",
"type": "table",
"intervals": []
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Add data to your topic

Once the ingestion job is running, add event data to your Confluent Cloud topic. For example, the following example is a single message produced to the topic my_topic. With schema auto-discovery, when subsequent messages include new properties, Polaris automatically detects them and ingests them as dimension columns in the table.

{
"date": "2023-09-01",
"time": "02:47:05.474Z",
"country": "australia",
"city": "darwin"
}
info

Each event timestamp must be within 30 days of ingestion time. Polaris rejects events with timestamps older than 30 days. Use batch file ingestion if you want to ingest older data.

The new data is ingested into the Polaris table you specified in the job.

For details on how to monitor or cancel the ingestion job, see View and manage ingestion jobs by API.

Learn more

See the following topics for more information: