Skip to main content

Dimension table upserts

To update event data in a table in Imply Polaris, you typically create an ingestion job to replace all data within a time interval. For tables that store slowly changing attributes about permanent objects, replacing time ranges of data is not intuitive or efficient.

In these cases, you can use a "dimension table" in Polaris to store object attributes. To keep data in the dimension table fresh, create a streaming ingestion job with upserts enabled. The ingestion job inserts new rows and updates values for existing rows in a table based upon a key that you define in the job.

Consider a simple example where you have a large fact table of customer transactions where you do not want to maintain the customer details like address. Instead, you have a "dimension table" called customer-info with the columns __time, customer_id, name, and address.

When you add or update a customer, you push an event to a Kafka stream, for example:

{"timestamp":"2025-01-10T22:24:56Z",
"customer_id":"1234",
"name": "Sally Sample",
"address": "123 A St., Alphaville, AA"}

If Polaris doesn't find an existing customer_id with the value "1234," it creates a new row for the customer. If Polaris detects an existing customer ID, it updates the "name" and "address" dimensions to the values from the event payload.

The following diagram illustrates the how the streaming ingestion job consumes events from the "customer events" Kafka topic and updates or inserts records to the customer table:

diagram of the Polaris upsert process

After Polaris processes the events at "1," the customer table contains two records: customer 1234 and customer 2345. When Polaris processes the events at "2," it updates the address for customer 1234 and adds a record for customer 3456.

At query time you can JOIN the dimension table "customer_info" to your fact table to return the latest customer data in your results.

info

You can only create a streaming ingestion job with upserts enabled using the Jobs API.

Familiarize yourself with streaming ingestion jobs before you start working with dimension upserts.

The examples in this topic demonstrate creating a new table with a streaming ingestion job. If you want to perform upserts to an existing table, cancel any existing streaming ingestion jobs before creating a new job with upserts enabled.

Prerequisites

To create an streaming ingestion job with upsert enabled, ensure that you have the following:

  • A Kafka-based streaming connection.

  • If you don't have one already, create a Polaris API key with the ManageIngestionJobs permission. For more information on permissions, visit Permissions reference. The examples in this topic use a variable named POLARIS_API_KEY to store the API key.

Create a streaming ingestion job with upserts

To enable dimension upserts in a streaming ingestion job, set the following properties in the streaming ingestion payload:

  • enableUpserts: When "true" enables upserts to existing rows. Defaults to false, in which case each event is a new record.
  • upserKeyColumns: An array of one or more column names that comprise the key for a row in a dimension table.
  • upsertVersionColumn: Defines the column that indicates the most recent data for a record. For example "__time" or "version".

For example:

{
"type": "streaming",
"target": {
"type": "table",
"tableName": "customer-info"
},
"createTableIfNotExists": True,
"enableUpserts": True,
"upsertKeyColumns": ["customer_id"],
"upsertVersionColumn": "__time"
...
}

As events pass through the stream, Polaris updates and inserts rows into the target table based upon the upsert key columns and the upsert version column.

Sample request

The following example request creates an ingestion job with upsert enabled:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "customer-table"
},
"createTableIfNotExists": true,
"enableUpserts": true,
"upsertKeyColumns": ["customer_id"],
"upsertVersionColumn": "__time",
"dimensionExclusions": [
"time"
],
"source": {
"type": "connection",
"connectionName": "customer-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "time"
},
{
"dataType": "long",
"name": "version"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"time\")"
}
],
"useSchemaDiscovery": true,
"readFromPoint": "earliest"
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "customer-connection",
"formatSettings": {
"flattenSpec": null,
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "time"
},
{
"dataType": "long",
"name": "version"
}
],
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"time\")",
"isAggregation": null
}
],
"dimensionExclusions": [
"time"
],
"useSchemaDiscovery": true,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"earlyMessageRejectionPeriod": "P2000D",
"readFromPoint": "earliest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_xxxxx...xxxxxx",
"userId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
},
"createdTimestamp": "2025-01-10T23:38:42.650964Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"lastModifiedBy": {
"username": "api-key-pok_xxxxx...xxxxxx",
"userId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
},
"lastUpdatedTimestamp": "2025-01-10T23:38:42.650964Z",
"spec": {
"source": {
"connectionName": "customer-connection",
"formatSettings": {
"flattenSpec": null,
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "time"
},
{
"dataType": "long",
"name": "version"
}
],
"type": "connection"
},
"target": {
"tableName": "customer_table",
"type": "table",
"intervals": null
},
"clusteringColumns": [],
"createTableIfNotExists": true,
"dimensionExclusions": [
"time"
],
"earlyMessageRejectionPeriod": "P2000D",
"enableUpserts": true,
"filterExpression": null,
"includeAllDimensions": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"time\")",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"partitionedBy": null,
"readFromPoint": "earliest",
"replaceRunning": false,
"upsertKeyColumns": [
"customer_id"
],
"upsertVersionColumn": "__time",
"useSchemaDiscovery": true,
"type": "streaming"
},
"target": {
"tableName": "customer-table",
"type": "table",
"intervals": null
},
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

See the following topics for more information: