Create a streaming ingestion job by API
Create a streaming ingestion job in Polaris to stream data into your tables from a source such as Apache Kafka, Confluent Cloud, or Amazon MSK. The concepts in this topic apply to all streaming jobs. For available streaming ingestion sources, see Ingestion sources overview.
In a streaming ingestion job, you specify the source of the input data, format settings of the data being ingested, how the input data maps to the table columns, and where in the stream to start ingesting data.
Streaming jobs support Polaris dimension table "upsert" functionality. After you familiarize yourself with streaming ingestion as detailed in this topic, see Dimension table upserts.
Before you continue in this topic, familiarize yourself with the basics of creating an ingestion job.
Prerequisites
Before streaming data into Polaris, ensure that you have the following:
A connection that describes your data source.
Events in a supported format and event timestamps within 30 days of ingestion time. If you need to ingest events older than 30 days, configure
lateMessageRejectionPeriod
accordingly in your ingestion job.If you don't have one already, create a Polaris API key with the
ManageIngestionJobs
permission. For more information on permissions, visit Permissions reference. The examples in this topic use a variable namedPOLARIS_API_KEY
to store the API key.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a streaming ingestion job
Define a streaming ingestion job with the following properties:
type
: Set the job type tostreaming
.target
: In this object, set thetype
totable
, and assign the Polaris table name totableName
. For example:"target": {
"type": "table",
"tableName": "example-table"
},createTableIfNotExists
: Boolean that directs Polaris to create the table if it doesn't already exist (false by default). When this property is true and the table does not exist, Polaris automatically creates the table using the framework in Automatically created tables.source
: In this object, set thetype
toconnection
, and list the connection name of the source of data inconnectionName
. Also define the schema of the input data ininputSchema
and data format settings informatSettings
.The following example shows a source object for streaming ingestion:
"source": {
"type": "connection",
"connectionName": "confluent-connection",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "col1"
},
],
"formatSettings": {
"format": "nd-json"
}
},mappings
: In this array, describe how the input fields of the source data map to the columns of the target table. A mapping for the__time
column is always required. Other mappings may be optional whenuseSchemaDiscovery
is set to true. See Map and transform data with input expressions for details and usage notes.useSchemaDiscovery
: Set totrue
for Polaris to automatically discover the schema of the data being ingested. Only applies for a streaming ingestion job into a flexible table.With schema auto-discovery, Polaris ingests all fields as dimension columns using the identity mapping. Exclude fields from ingestion by declaring them in
dimensionExclusions
. To ingest certain fields into measure columns, or to transform data during ingestion, define the mappings between these fields and the table columns inmappings
.By default,
useSchemaDiscovery
is set tofalse
. In this case, you must define each mapping between the source data and the target table. When schema auto-discovery is disabled, Polaris ignores any unmapped input fields.dimensionExclusions
: Deny list containing the input fields you want to exclude from ingestion. Applies whenuseSchemaDiscovery
istrue
. For example:{
"type": "streaming",
...
"useSchemaDiscovery": true,
"dimensionExclusions": ["col1", "col2"]
}readFromPoint
: Set toearliest
orlatest
to instruct Polaris to read data from the earliest or latest point available in the stream, respectively.
For a given table and topic or stream, Polaris preserves the reading checkpoint of the topic or stream. To manually reset the offset from which Polaris ingests streaming data, see Reset streaming job offset.
earlyMessageRejectionPeriod
: Time period relative to the current time that determines the cutoff for future data. Provide an ISO 8601 duration, such asP2000D
. By default, Polaris doesn't ingest data newer than 2000 days.lateMessageRejectionPeriod
: Time period relative to the current time that determines the cutoff for past data. Provide an ISO 8601 duration, such asP30D
. By default, Polaris doesn't ingest data older than 30 days.
See the Jobs v1 API documentation for a description of all parameters.
The following example shows a job spec for a streaming ingestion job:
{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}
Sample request
Send a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a streaming ingestion job.
You can create multiple ingestion jobs with the same connection—for example, to ingest event data from a topic into both a detail table and an aggregate table.
The following example request creates an ingestion job using the example-connection
connection for a flexible table named example-table
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": True,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response:
Click to view the response
{
"source": {
"connectionName": "example-connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"type": "streaming",
"id": "e9b77d70-1b01-42a9-aa1c-56f764d105d1",
"target": {
"tableName": "example-table",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-03-23T23:00:57.235117332Z",
"lastUpdatedTimestamp": "2023-03-23T23:00:57.235117332Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Create a streaming ingestion job without auto-discovery
When schema auto-discovery for ingestion jobs is disabled ("useSchemaDiscovery": false
),
you must define all the input fields in inputSchema
and
list how the input fields map to the table columns in mappings
.
Schema auto-discovery only applies to flexible tables. When schema auto-discovery is disabled, you can ingest data into either strict or flexible tables.
Sample request
The following example request creates a streaming ingestion job with schema auto-discovery disabled:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": false,
"readFromPoint": "latest"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": False,
"readFromPoint": "latest"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response:
Click to view the response
{
"source": {
"connectionName": "example-connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"type": "streaming",
"id": "a2ad0336-9582-4404-bc44-82f20747b536",
"target": {
"tableName": "example-table",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-04-14T18:48:25.750490866Z",
"lastUpdatedTimestamp": "2023-04-14T18:48:25.750490866Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Reset streaming job offset
For a given table and event source, such as a Kafka topic or Kinesis stream, Polaris retains the position from which it last successfully ingested the streaming data. Polaris associates the topic or stream name with the table regardless of the connection name.
Subsequent ingestion jobs with the same source and table read from the last stored position, not the readFromPoint
.
Polaris only starts ingesting from the configured read point when the job reads from a different topic or stream name or when it targets a table with different name.
To force Polaris to ingest data using the offset configured in readFromPoint
, reset the streaming ingestion job.
You can issue the reset on any running streaming job.
To reset a job, submit a POST
request to /v2/jobs/JOB_ID/reset
.
To reset a job using the UI, see Reset streaming job offset.
Resetting the job may cause Polaris to skip events or read them twice, resulting in missing or duplicate data.
Sample request
The following example shows how to reset a streaming ingestion job with the job ID db6a3110-d6c3-4a63-86b2-41d51a65ce11
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset" \
--header "Authorization: Basic $POLARIS_API_KEY"
import os
import requests
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset"
apikey = os.getenv("POLARIS_API_KEY")
payload={}
headers = {
'Authorization': f'Basic {apikey}'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response.
Learn more
See the following topics for more information:
- Jobs v1 API for reference on the Polaris API.
- Schema auto-discovery to learn about automatic detection of your input fields.
- Troubleshoot data ingestion for issues around ingestion.
- Dimension table upserts to learn how to keep dimension table data current.
- Ingest from one of the following streaming ingestion sources:
- Ingest event metadata along with message values: