• Developer guide
  • API reference

›Ingestion jobs

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Customize Polaris
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Ingestion jobs

    • Create an ingestion job
    • Ingest using SQL
    • Job auto-discovery
    • Timestamp expressions
    • SQL ingestion reference
    • Ingestion status reference
  • Data partitioning
  • Introduction to rollup
  • Replace data
  • Ingestion use cases

    • Approximation algorithms
    • Ingest earliest or latest value

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry
  • Ingestion source reference

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations

Querying

  • Overview
  • Time series functions

Monitoring

  • Overview
  • Monitoring dashboards
  • Monitor performance metrics
  • Integrate with Datadog
  • Integrate with Prometheus
  • Integrate with Elastic stack
  • Metrics reference

Management

  • Overview
  • Pause and resume a project

Usage and Billing

  • Billing structure overview
  • Polaris plans
  • Add a payment method
  • Monitor account usage

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Security

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
    • Manage users and groups
    • Restrict an embedding link
  • Migrate deprecated resources
  • Create a table
  • Upload files
  • Ingestion jobs

    • Create an ingestion job
    • Create a streaming ingestion job
    • Ingest using SQL
    • View and manage jobs

    Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference

    Ingestion use cases

    • Filter data to ingest
    • Ingest nested data
    • Ingest and query sketches
    • Specify data schema
    • Ingest Kafka metadata

    Analytics

    • Query data
    • Connect over JDBC
    • Link to BI tools
    • Query parameters reference
  • Update a project
  • API documentation

    • OpenAPI reference
    • Query API

    Migrations

    • Migrate from Hybrid

Product info

    Release notes

    • 2023
    • 2022
  • Known limitations
  • Druid extensions

Create a streaming ingestion job by API

Create a streaming ingestion job in Polaris to stream data into your tables from a source such as Apache Kafka, Confluent Cloud, or Amazon MSK.

In a streaming ingestion job, you specify the source of the input data, format settings of the data being ingested, how the input data maps to the table columns, and where in the stream to start ingesting data.

Before you continue in this topic, familiarize yourself with the basics of creating an ingestion job.

The concepts in this topic apply to all streaming jobs. For available streaming ingestion sources, see Ingestion sources overview.

Prerequisites

Before streaming data into Polaris, ensure that you have a target table and a connection that describes your data source.

This topic assumes that you have an API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. To obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

Create a streaming ingestion job

Define a streaming ingestion job with the following properties:

  • type: Set the job type to streaming.

  • target: In this object, set the type to table, and assign the Polaris table name to tableName. For example:

    "target": {
      "type": "table",
      "tableName": "example-table"
    },
    
  • source: In this object, set the type to connection, and list the connection name of the source of data in connectionName. Also define the schema of the input data in inputSchema and data format settings in formatSettings.

    The following example shows a source object for streaming ingestion:

    "source": {
        "type": "connection",
        "connectionName": "confluent-connection",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "col1"
            },
        ],
        "formatSettings": {
            "format": "nd-json"
        }
    },
    
  • mappings: In this array, describe how the input fields of the source data map to the columns of the target table. A mapping for the __time column is always required. Other mappings may be optional when useSchemaDiscovery is set to true. See Map and transform data with input expressions for details and usage notes.

  • useSchemaDiscovery: Set to true for Polaris to automatically discover the schema of the data being ingested. Only applies for a streaming ingestion job into a flexible table.

    With schema auto-discovery, Polaris ingests all fields as dimension columns using the identity mapping. Exclude fields from ingestion by declaring them in dimensionExclusions. To ingest certain fields into measure columns, or to transform data during ingestion, define the mappings between these fields and the table columns in mappings.

    By default, useSchemaDiscovery is set to false. In this case, you must define each mapping between the source data and the target table. When schema auto-discovery is disabled, Polaris ignores any unmapped input fields.

  • dimensionExclusions: Deny list containing the input fields you want to exclude from ingestion. Applies when useSchemaDiscovery is true. For example:

    {
      "type": "streaming",
      ...
      "useSchemaDiscovery": true,
      "dimensionExclusions": ["col1", "col2"]
    }
    
  • readFromPoint: Set to earliest or latest to instruct Polaris to read data from the earliest or latest point available in the stream, respectively.

    For a given table and topic or stream, Polaris preserves the reading checkpoint of the topic or stream. If the same topic or stream is used in a new connection or new ingestion jobs, the reading checkpoint is still maintained for the table. Polaris resets the reading checkpoint when the table has a new streaming ingestion job that reads from a different topic or stream. To manually reset the offset from which Polaris ingests streaming data, see Reset streaming job offset.

See the Job v2 API documentation for a description of all parameters.

The following example shows a job spec for a streaming ingestion job:

{
    "type": "streaming",
    "target": {
        "type": "table",
        "tableName": "example-table"
    },
    "source": {
        "type": "connection",
        "connectionName": "example-connection",
        "formatSettings": {
            "format": "nd-json"
        },
        "inputSchema": [
            {
                "dataType": "string",
                "name": "date"
            }
        ]
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"date\")"
        }
    ],
    "useSchemaDiscovery": true,
    "dimensionExclusions": [
        "col1",
        "col2"
    ],
    "readFromPoint": "earliest"
}

Sample request

Send a POST request to the /v2/jobs endpoint to create a streaming ingestion job. You can create multiple ingestion jobs with the same connection—for example, to ingest event data from a topic into both a detail table and an aggregate table.

The following example request creates an ingestion job using the example-connection connection for a flexible table named example-table:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": true,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
}'
import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
}
],
"useSchemaDiscovery": True,
"dimensionExclusions": [
"col1",
"col2"
],
"readFromPoint": "earliest"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

The following example shows a successful response:

Click to view the response

{
    "source": {
        "connectionName": "example-connection",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "date"
            }
        ],
        "formatSettings": {
            "flattenSpec": {},
            "format": "nd-json"
        },
        "type": "connection"
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"date\")"
        }
    ],
    "dimensionExclusions": [],
    "useSchemaDiscovery": false,
    "filterExpression": null,
    "lateMessageRejectionPeriod": "P30D",
    "readFromPoint": "latest",
    "type": "streaming",
    "id": "e9b77d70-1b01-42a9-aa1c-56f764d105d1",
    "target": {
        "tableName": "example-table",
        "intervals": [],
        "type": "table"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2023-03-23T23:00:57.235117332Z",
    "lastUpdatedTimestamp": "2023-03-23T23:00:57.235117332Z",
    "startedTimestamp": null,
    "completedTimestamp": null
}

Create a streaming ingestion job without auto-discovery

When schema auto-discovery for ingestion jobs is disabled ("useSchemaDiscovery": false), you must define all the input fields in inputSchema and list how the input fields map to the table columns in mappings.

You can create either a strict or flexible table.

Sample request

The following example request creates a streaming ingestion job with schema auto-discovery disabled:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": false,
"readFromPoint": "latest"
}'
import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"source": {
"type": "connection",
"connectionName": "example-connection",
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "country"
},
{
"dataType": "string",
"name": "city"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "country",
"expression": "country"
},
{
"columnName": "city",
"expression": "city"
}
],
"useSchemaDiscovery": False,
"readFromPoint": "latest"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

The following example shows a successful response:

Click to view the response

{
    "source": {
        "connectionName": "example-connection",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "date"
            },
            {
                "dataType": "string",
                "name": "time"
            },
            {
                "dataType": "string",
                "name": "country"
            },
            {
                "dataType": "string",
                "name": "city"
            }
        ],
        "formatSettings": {
            "flattenSpec": {},
            "format": "nd-json"
        },
        "type": "connection"
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
        },
        {
            "columnName": "country",
            "expression": "country"
        },
        {
            "columnName": "city",
            "expression": "city"
        }
    ],
    "dimensionExclusions": [],
    "useSchemaDiscovery": false,
    "filterExpression": null,
    "lateMessageRejectionPeriod": "P30D",
    "readFromPoint": "latest",
    "type": "streaming",
    "id": "a2ad0336-9582-4404-bc44-82f20747b536",
    "target": {
        "tableName": "example-table",
        "intervals": [],
        "type": "table"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2023-04-14T18:48:25.750490866Z",
    "lastUpdatedTimestamp": "2023-04-14T18:48:25.750490866Z",
    "startedTimestamp": null,
    "completedTimestamp": null
}

Reset streaming job offset

For a given table and event source, such as a Kafka topic or Kinesis stream, Polaris retains the position from which it last successfully ingested the streaming data. Polaris associates the topic or stream name with the table regardless of the connection name. Subsequent ingestion jobs with the same source and target read from the last stored position. When setting up a new ingestion job that reads from a different topic or stream, Polaris ingests from the read point configured on the job.

To restore the checkpoint to the earliest or latest point of the event source, submit a POST request to reset the streaming job. You can issue the reset request on any running streaming job.

After you reset the streaming job, Polaris ingests the event data using the offset set in readFromPoint for the running job.

Resetting the job may cause Polaris to skip events or read them twice, resulting in missing or duplicate data.

Sample request

The following example shows how to reset a streaming ingestion job with the job ID db6a3110-d6c3-4a63-86b2-41d51a65ce11:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset' \
--user ${POLARIS_API_KEY}:
import os
import requests

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs/db6a3110-d6c3-4a63-86b2-41d51a65ce11/reset"

apikey = os.getenv("POLARIS_API_KEY")

payload={}
headers = {
'Authorization': f'Basic {apikey}'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response.

Learn more

See the following topics for more information:

  • Create a table by API
  • Ingest data from Kafka and MSK by API
  • Ingest data from Confluent Cloud by API
  • Ingest data from Amazon Kinesis by API
  • Jobs v2 API
← Create an ingestion jobIngest using SQL →
  • Prerequisites
  • Create a streaming ingestion job
    • Sample request
    • Sample response
  • Create a streaming ingestion job without auto-discovery
    • Sample request
    • Sample response
  • Reset streaming job offset
    • Sample request
    • Sample response
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc