Skip to main content

Create a streaming ingestion job by API

Create a streaming ingestion job in Polaris to stream data into your tables from a source such as Apache Kafka, Confluent Cloud, or Amazon MSK. The concepts in this topic apply to all streaming jobs. For available streaming ingestion sources, see Ingestion sources overview.

In a streaming ingestion job, you specify the source of the input data, format settings of the data being ingested, how the input data maps to the table columns, and where in the stream to start ingesting data.

Streaming jobs support Polaris dimension table "upsert" functionality. After you familiarize yourself with streaming ingestion as detailed in this topic, see Dimension table upserts.

Before you continue in this topic, familiarize yourself with the basics of creating an ingestion job.

Prerequisites

Before streaming data into Polaris, ensure that you have the following:

  • A connection that describes your data source.

  • Events in a supported format and event timestamps within 30 days of ingestion time. If you need to ingest events older than 30 days, configure lateMessageRejectionPeriod accordingly in your ingestion job.

  • If you don't have one already, create a Polaris API key with the ManageIngestionJobs permission. For more information on permissions, visit Permissions reference. The examples in this topic use a variable named POLARIS_API_KEY to store the API key.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Create a streaming ingestion job

Define a streaming ingestion job with the following properties:

  • type: Set the job type to streaming.

  • target: In this object, set the type to table, and assign the Polaris table name to tableName. For example:

    "target": {
    "type": "table",
    "tableName": "example-table"
    },
  • createTableIfNotExists: Boolean that directs Polaris to create the table if it doesn't already exist (false by default). When this property is true and the table does not exist, Polaris automatically creates the table using the framework in Automatically created tables.

  • source: In this object, set the type to connection, and list the connection name of the source of data in connectionName. Also define the schema of the input data in inputSchema and data format settings in formatSettings.

    The following example shows a source object for streaming ingestion:

    "source": {
    "type": "connection",
    "connectionName": "confluent-connection",
    "inputSchema": [
    {
    "dataType": "string",
    "name": "timestamp"
    },
    {
    "dataType": "string",
    "name": "col1"
    },
    ],
    "formatSettings": {
    "format": "nd-json"
    }
    },
  • mappings: In this array, describe how the input fields of the source data map to the columns of the target table. A mapping for the __time column is always required. Other mappings may be optional when useSchemaDiscovery is set to true. See Map and transform data with input expressions for details and usage notes.

  • useSchemaDiscovery: Set to true for Polaris to automatically discover the schema of the data being ingested. Only applies for a streaming ingestion job into a flexible table.

    With schema auto-discovery, Polaris ingests all fields as dimension columns using the identity mapping. Exclude fields from ingestion by declaring them in dimensionExclusions. To ingest certain fields into measure columns, or to transform data during ingestion, define the mappings between these fields and the table columns in mappings.

    By default, useSchemaDiscovery is set to false. In this case, you must define each mapping between the source data and the target table. When schema auto-discovery is disabled, Polaris ignores any unmapped input fields.

  • dimensionExclusions: Deny list containing the input fields you want to exclude from ingestion. Applies when useSchemaDiscovery is true. For example:

    {
    "type": "streaming",
    ...
    "useSchemaDiscovery": true,
    "dimensionExclusions": ["col1", "col2"]
    }
  • readFromPoint: Set to earliest or latest to instruct Polaris to read data from the earliest or latest point available in the stream, respectively.

info

For a given table and topic or stream, Polaris preserves the reading checkpoint of the topic or stream. To manually reset the offset from which Polaris ingests streaming data, see Reset streaming job offset.

  • earlyMessageRejectionPeriod: Time period relative to the current time that determines the cutoff for future data. Provide an ISO 8601 duration, such as P2000D. By default, Polaris doesn't ingest data newer than 2000 days.

  • lateMessageRejectionPeriod: Time period relative to the current time that determines the cutoff for past data. Provide an ISO 8601 duration, such as P30D. By default, Polaris doesn't ingest data older than 30 days.

See the Jobs v1 API documentation for a description of all parameters.

The following example shows a job spec for a streaming ingestion job:

{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}

Sample request

Send a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job. You can create multiple ingestion jobs with the same connectionfor example, to ingest event data from a topic into both a detail table and an aggregate table.

The following example request creates an ingestion job using the example-connection connection for a flexible table named example-table:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}'

Sample response

The following example shows a successful response:

Click to view the response
{
"source": {
"connectionName": "example-connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"type": "streaming",
"id": "e9b77d70-1b01-42a9-aa1c-56f764d105d1",
"target": {
"tableName": "example-table",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-03-23T23:00:57.235117332Z",
"lastUpdatedTimestamp": "2023-03-23T23:00:57.235117332Z",
"startedTimestamp": null,
"completedTimestamp": null
}

Create a streaming ingestion job without auto-discovery

When schema auto-discovery for ingestion jobs is disabled ("useSchemaDiscovery": false), you must define all the input fields in inputSchema and list how the input fields map to the table columns in mappings.

Schema auto-discovery only applies to flexible tables. When schema auto-discovery is disabled, you can ingest data into either strict or flexible tables.

Sample request

The following example request creates a streaming ingestion job with schema auto-discovery disabled:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": false,
"readFromPoint": "latest"
}'

Sample response

The following example shows a successful response:

Click to view the response
{
"source": {
"connectionName": "example-connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"type": "streaming",
"id": "a2ad0336-9582-4404-bc44-82f20747b536",
"target": {
"tableName": "example-table",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-04-14T18:48:25.750490866Z",
"lastUpdatedTimestamp": "2023-04-14T18:48:25.750490866Z",
"startedTimestamp": null,
"completedTimestamp": null
}

Reset streaming job offset

For a given table and event source, such as a Kafka topic or Kinesis stream, Polaris retains the position from which it last successfully ingested the streaming data. Polaris associates the topic or stream name with the table regardless of the connection name.

Subsequent ingestion jobs with the same source and table read from the last stored position, not the readFromPoint. Polaris only starts ingesting from the configured read point when the job reads from a different topic or stream name or when it targets a table with different name. To force Polaris to ingest data using the offset configured in readFromPoint, reset the streaming ingestion job.

You can issue the reset on any running streaming job. To reset a job, submit a POST request to /v2/jobs/JOB_ID/reset. To reset a job using the UI, see Reset streaming job offset.

info

Resetting the job may cause Polaris to skip events or read them twice, resulting in missing or duplicate data.

Sample request

The following example shows how to reset a streaming ingestion job with the job ID db6a3110-d6c3-4a63-86b2-41d51a65ce11:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset" \
--header "Authorization: Basic $POLARIS_API_KEY"

Sample response

A successful request returns a 200 OK response.

Learn more

See the following topics for more information: