Skip to main content

Create an ingestion job by API

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024.

You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023. For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID

Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.

To ingest data into Imply Polaris, you need to create an ingestion job. The ingestion job defines the shape of your source data, including the data type and input fields, and maps the input fields to the destination columns in Polaris.

This topic covers the basics of creating an ingestion job and the options for transforming source data using the Polaris API. For information on how to launch an ingestion job and transform input data in the UI, see Create an ingestion job.

The examples in this topic are based on batch ingestion from files.

Prerequisites

Before creating an ingestion job, you must first define a data source. See Ingestion sources overview for available ingestion sources.

This topic assumes that you have an API key with the ManageTables and ManageIngestionJobs permissions. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Stage data

Before you launch an ingestion job, first define a data source, such as a connection to a streaming service or a data file.

  1. Create or download the JSON file containing your batch data. The examples in this topic use the demo data file docs-data.json.
  2. Send a POST request to the /v1/files endpoint to upload the file to staging.

Map and transform input fields

To launch an ingestion job, send a POST request to the /v1/projects/PROJECT_ID/jobs endpoint. In the request payload, the mappings array is where you specify how to map input fields to output columns. If the input schema is different from the target table schema, you can also use mappings to transform input data as part of the ingestion job.

Each object in mappings takes the following properties:

  • columnName: The name of the output column.

  • expression: A Druid SQL expression that describes how to compute the value for a column from a set of input fields. The expression can be a simple value, such as an input field name, or a combination of field names and Druid SQL functions. See Map and transform data with input expressions for details and supported operations.

  • isAggregation: An optional Boolean property. Only applies to a flexible table that does not have the output column declared in the table schema. Set this property to true when the expression references an aggregation function for ingesting into a measure. Be sure to set isAggregation if you rely on automatically created tables.

    If isAggregation is not specified, Polaris refers to the table schema. The following behavior holds:

    • If columnName is declared as a measure, Polaris treats isAggregation as true.
    • If columnName is declared as a dimension, Polaris treats isAggregation as false.
    • If columnName is not declared in the table schema, Polaris treats isAggregation as false by default and assumes the expression applies to a dimension.

In streaming ingestion jobs into flexible tables, you may enable schema auto-discovery. With schema auto-discovery, you may not need to define all the input fields or mappings. Polaris automatically discovers input fields and maps them to table dimensions using the identity mapping.

See the Jobs v1 API documentation for a description of all required parameters.

Sample request

The following example shows how to load data from docs-data.json into the demo_table table:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data '{
"type": "batch",
"source": {
"type": "uploaded",
"fileList": [
"docs-data.json"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\""
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\""
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")"
},
{
"columnName": "Zip code",
"expression": "\"zip_code\""
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\")))"
}
],
"target": {
"type": "table",
"tableName": "demo_table"
},
"createTableIfNotExists": true
}'

In this example, Druid SQL functions perform the following transformations:

  • TIME_PARSE(CONCAT(\"date\", 'T', \"time\")) creates the __time output column by concatenating the date and time input columns.
  • UPPER(\"neighborhood\") transforms the values of the neighborhood input column into uppercase.
  • MAX(\"rating\") returns the maximum of rating numbers.
  • TIMESTAMPADD(YEAR, 1, (TIME_PARSE(CONCAT(\"date\", 'T', \"time\")))) adds a year to the timestamp and returns a new timestamp in milliseconds.

Sample response

The following example shows a successful response:

Click to view the response
{
"source": {
"fileList": [
"docs-data.json"
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
],
"type": "uploaded"
},
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\"",
"isAggregation": null
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\"",
"isAggregation": null
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")",
"isAggregation": null
},
{
"columnName": "Zip code",
"expression": "\"zip_code\"",
"isAggregation": null
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\")))",
"isAggregation": null
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"query": "INSERT INTO \"demo_table\"\nSELECT\n TIME_PARSE(CONCAT(\"date\", 'T', \"time\")) AS \"__time\",\n \"restaurant\" AS \"Restaurant\",\n \"cuisine_type\" AS \"Cuisine type\",\n UPPER(\"neighborhood\") AS \"Neighborhood\",\n \"zip_code\" AS \"Zip code\",\n TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))) AS \"Next inspection\",\n MAX(\"rating\") AS \"Rating\",\n COUNT(*) AS \"__count\"\nFROM TABLE(\n POLARIS_SOURCE(\n '{\"fileList\":[\"docs-data.json\"],\"formatSettings\":{\"flattenSpec\":{},\"format\":\"nd-json\"},\"inputSchema\":[{\"dataType\":\"string\",\"name\":\"date\"},{\"dataType\":\"string\",\"name\":\"time\"},{\"dataType\":\"string\",\"name\":\"restaurant\"},{\"dataType\":\"string\",\"name\":\"cuisine_type\"},{\"dataType\":\"string\",\"name\":\"neighborhood\"},{\"dataType\":\"long\",\"name\":\"zip_code\"},{\"dataType\":\"long\",\"name\":\"rating\"}],\"type\":\"uploaded\"}'\n )\n)\n\nGROUP BY 1, 2, 3, 4, 5, 6\nPARTITIONED BY DAY",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-11T23:15:02.525488003Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a8686-e6fe-71a7-b0d8-617b1cb48a44",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-11T23:15:02.525488003Z",
"spec": {
"source": {
"fileList": [
"docs-data.json"
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
],
"type": "uploaded"
},
"target": {
"tableName": "demo_table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\"",
"isAggregation": null
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\"",
"isAggregation": null
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")",
"isAggregation": null
},
{
"columnName": "Zip code",
"expression": "\"zip_code\"",
"isAggregation": null
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\")))",
"isAggregation": null
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "demo_table",
"type": "table",
"intervals": []
},
"type": "batch",
"completedTimestamp": null,
"startedTimestamp": null
}
tip

Notice the query property in the response body. Polaris returns the SQL equivalent of the ingestion job that you can use as the basis for creating SQL-based ingestion jobs.

Click to view the SQL equivalent
INSERT INTO "demo_table"
SELECT
TIME_PARSE(CONCAT("date", 'T', "time")) AS "__time",
"restaurant" AS "Restaurant",
"cuisine_type" AS "Cuisine type",
UPPER("neighborhood") AS "Neighborhood",
"zip_code" AS "Zip code",
TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT("date", 'T', "time"))) AS "Next inspection",
MAX("rating") AS "Rating",
COUNT(*) AS "__count"
FROM TABLE(
POLARIS_SOURCE(
'{"fileList":["docs-data.json"],"formatSettings":{"flattenSpec":{},"format":"nd-json"},"inputSchema":[{"dataType":"string","name":"date"},{"dataType":"string","name":"time"},{"dataType":"string","name":"restaurant"},{"dataType":"string","name":"cuisine_type"},{"dataType":"string","name":"neighborhood"},{"dataType":"long","name":"zip_code"},{"dataType":"long","name":"rating"}],"type":"uploaded"}'
)
)
GROUP BY 1, 2, 3, 4, 5, 6
PARTITIONED BY DAY

To track the job status or cancel the job, see View and manage ingestion jobs by API.

Example

The following Python example shows how to create an ingestion job and transform input data using the Jobs v1 API.

import os
import requests
import json

# Replace placeholders with your information
ORGANIZATION_NAME = "example"
REGION = "us-east-1"
CLOUD_PROVIDER = "aws"
PROJECT_ID = "12375ffx-f7x4-4f0x-a1a6-3b3424987ee0"

# Store the endpoint for Polaris Jobs API
url = f"https://{ORGANIZATION_NAME}.{REGION}.{CLOUD_PROVIDER}.api.imply.io/v1/projects/{PROJECT_ID}/jobs"

# Supply the API key in the following string variable.
# Do not supply your API key credentials in production scripts and
# do not check them into version control systems.
# See https://docs.imply.io/polaris/api-keys for more information.
apikey = os.getenv("POLARIS_API_KEY")

# Supply the dictionary that contains the request payload.
payload = {
"type": "batch",
"source": {
"type": "uploaded",
"fileList": [
"docs-data.json"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\""
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\""
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")"
},
{
"columnName": "Zip code",
"expression": "\"zip_code\""
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, (TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))))"
}
],
"target": {
"type": "table",
"tableName": "python_test"
},
"createTableIfNotExists": True
}

def make_post(url, payload, key):

payload = json.dumps(payload)
headers = {
'Authorization': f'Basic {key}',
'Content-Type': 'application/json'
}
return requests.post(url, headers=headers, data=payload)

response = make_post(url, payload, apikey)

print(response.json())

Usage tips for input expressions

Note the following when you define input expressions in an ingestion job:

  • Always quote your input fields to avoid syntax errors. For example, "my_input_field".
  • Escape the double quotes with \ for all input fields in expressions. For example, "expression":"\"my_input_field\"".
  • Input expressions can only refer to fields present in the inputSchema defined in the job; in particular, they cannot refer to other expressions.
  • If your data source contains a __time input field, Polaris treats the __time field as the primary timestamp. The __time input field must be a long data type in milliseconds since Unix epoch format. For more information, see Timestamp expressions.

Learn more

See the following topics for more information: