Create a streaming ingestion job by API
Create a streaming ingestion job in Polaris to stream data into your tables from a source such as Apache Kafka, Confluent Cloud, or Amazon MSK.
In a streaming ingestion job, you specify the source of the input data, format settings of the data being ingested, how the input data maps to the table columns, and where in the stream to start ingesting data.
Before you continue in this topic, familiarize yourself with the basics of creating an ingestion job.
The concepts in this topic apply to all streaming jobs. For available streaming ingestion sources, see Ingestion sources overview.
Prerequisites
Before streaming data into Polaris, ensure that you have a target table and a connection that describes your data source.
This topic assumes that you have an API key with the ManageIngestionJobs
permission.
In the examples below, the key value is stored in the variable named POLARIS_API_KEY
.
To obtain an API key and assign permissions, see API key authentication.
For more information on permissions, visit Permissions reference.
Create a streaming ingestion job
Define a streaming ingestion job with the following properties:
type
: Set the job type tostreaming
.target
: In this object, set thetype
totable
, and assign the Polaris table name totableName
. For example:"target": { "type": "table", "tableName": "example-table" },
source
: In this object, set thetype
toconnection
, and list the connection name of the source of data inconnectionName
. Also define the schema of the input data ininputSchema
and data format settings informatSettings
.The following example shows a source object for streaming ingestion:
"source": { "type": "connection", "connectionName": "confluent-connection", "inputSchema": [ { "dataType": "string", "name": "timestamp" }, { "dataType": "string", "name": "col1" }, ], "formatSettings": { "format": "nd-json" } },
mappings
: In this array, describe how the input fields of the source data map to the columns of the target table. A mapping for the__time
column is always required. Other mappings may be optional whenuseSchemaDiscovery
is set to true. See Map and transform data with input expressions for details and usage notes.useSchemaDiscovery
: Set totrue
for Polaris to automatically discover the schema of the data being ingested. Only applies for a streaming ingestion job into a flexible table.With schema auto-discovery, Polaris ingests all fields as dimension columns using the identity mapping. Exclude fields from ingestion by declaring them in
dimensionExclusions
. To ingest certain fields into measure columns, or to transform data during ingestion, define the mappings between these fields and the table columns inmappings
.By default,
useSchemaDiscovery
is set tofalse
. In this case, you must define each mapping between the source data and the target table. When schema auto-discovery is disabled, Polaris ignores any unmapped input fields.dimensionExclusions
: Deny list containing the input fields you want to exclude from ingestion. Applies whenuseSchemaDiscovery
istrue
. For example:{ "type": "streaming", ... "useSchemaDiscovery": true, "dimensionExclusions": ["col1", "col2"] }
readFromPoint
: Set toearliest
orlatest
to instruct Polaris to read data from the earliest or latest point available in the stream, respectively.For a given table and topic or stream, Polaris preserves the reading checkpoint of the topic or stream. If the same topic or stream is used in a new connection or new ingestion jobs, the reading checkpoint is still maintained for the table. Polaris resets the reading checkpoint when the table has a new streaming ingestion job that reads from a different topic or stream. To manually reset the offset from which Polaris ingests streaming data, see Reset streaming job offset.
See the Job v2 API documentation for a description of all parameters.
The following example shows a job spec for a streaming ingestion job:
{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}
Sample request
Send a POST
request to the /v2/jobs
endpoint to create a streaming ingestion job.
You can create multiple ingestion jobs with the same connection—for example, to ingest event data from a topic into both a detail table and an aggregate table.
The following example request creates an ingestion job using the example-connection
connection for a flexible table named example-table
:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": True,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response:
Click to view the response
{
"source": {
"connectionName": "example-connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"type": "streaming",
"id": "e9b77d70-1b01-42a9-aa1c-56f764d105d1",
"target": {
"tableName": "example-table",
"intervals": [],
"type": "table"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-03-23T23:00:57.235117332Z",
"lastUpdatedTimestamp": "2023-03-23T23:00:57.235117332Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Create a streaming ingestion job without auto-discovery
When schema auto-discovery for ingestion jobs is disabled ("useSchemaDiscovery": false
),
you must define all the input fields in inputSchema
and
list how the input fields map to the table columns in mappings
.
You can create either a strict or flexible table.
Sample request
The following example request creates a streaming ingestion job with schema auto-discovery disabled:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": false,
"readFromPoint": "latest"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": False,
"readFromPoint": "latest"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response:
Click to view the response
{
"source": {
"connectionName": "example-connection",
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"type": "streaming",
"id": "a2ad0336-9582-4404-bc44-82f20747b536",
"target": {
"tableName": "example-table",
"intervals": [],
"type": "table"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-04-14T18:48:25.750490866Z",
"lastUpdatedTimestamp": "2023-04-14T18:48:25.750490866Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Reset streaming job offset
For a given table and event source, such as a Kafka topic or Kinesis stream, Polaris retains the position from which it last successfully ingested the streaming data. Polaris associates the topic or stream name with the table regardless of the connection name. Subsequent ingestion jobs with the same source and target read from the last stored position. When setting up a new ingestion job that reads from a different topic or stream, Polaris ingests from the read point configured on the job.
To restore the checkpoint to the earliest or latest point of the event source,
submit a POST
request to reset the streaming job.
You can issue the reset request on any running streaming job.
After you reset the streaming job, Polaris ingests the event data
using the offset set in readFromPoint
for the running job.
Resetting the job may cause Polaris to skip events or read them twice, resulting in missing or duplicate data.
Sample request
The following example shows how to reset a streaming ingestion job with the job ID db6a3110-d6c3-4a63-86b2-41d51a65ce11
:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset' \
--user ${POLARIS_API_KEY}:
import os
import requests
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset"
apikey = os.getenv("POLARIS_API_KEY")
payload={}
headers = {
'Authorization': f'Basic {apikey}'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response.
Learn more
See the following topics for more information: