Create an ingestion job by API
To ingest data into Imply Polaris, you need to create an ingestion job. The ingestion job defines the shape of your source data, including the data type and input fields, and maps the input fields to the destination columns in Polaris.
This topic covers the basics of creating an ingestion job and the options for transforming source data using the Polaris API. For information on how to launch an ingestion job and transform input data in the UI, see Create an ingestion job.
The examples in this topic are based on batch ingestion from files.
Prerequisites
Before creating an ingestion job, you must first define a data source. See Ingestion sources overview for available ingestion sources.
This topic assumes that you have an API key with the ManageTables
and ManageIngestionJobs
permissions.
In the examples below, the key value is stored in the variable named POLARIS_API_KEY
.
For information about how to obtain an API key and assign permissions, see API key authentication.
For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Stage data
Before you launch an ingestion job, first define a data source, such as a connection to a streaming service or a data file.
- Create or download the JSON file containing your batch data. The examples in this topic use the demo data file
docs-data.json
. - Send a
POST
request to the/v1/files
endpoint to upload the file to staging.
Map and transform input fields
To launch an ingestion job, send a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint.
In the request payload, the mappings
array is where you specify how to map input fields to output columns.
If the input schema is different from the target table schema, you can also use mappings
to transform input data as part of the ingestion job.
Each object in mappings
takes the following properties:
columnName
: The name of the output column.expression
: A Druid SQL expression that describes how to compute the value for a column from a set of input fields. The expression can be a simple value, such as an input field name, or a combination of field names and Druid SQL functions. See Map and transform data with input expressions for details and supported operations.isAggregation
: An optional Boolean property. Only applies to a flexible table that does not have the output column declared in the table schema. Set this property to true when the expression references an aggregation function for ingesting into a measure. Be sure to setisAggregation
if you rely on automatically created tables.If
isAggregation
is not specified, Polaris refers to the table schema. The following behavior holds:- If
columnName
is declared as a measure, Polaris treatsisAggregation
as true. - If
columnName
is declared as a dimension, Polaris treatsisAggregation
as false. - If
columnName
is not declared in the table schema, Polaris treatsisAggregation
as false by default and assumes the expression applies to a dimension.
- If
In streaming ingestion jobs into flexible tables, you may enable schema auto-discovery. With schema auto-discovery, you may not need to define all the input fields or mappings. Polaris automatically discovers input fields and maps them to table dimensions using the identity mapping.
See the Jobs v1 API documentation for a description of all required parameters.
Sample request
The following example shows how to load data from docs-data.json
into the demo_table
table:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
"type": "batch",
"source": {
"type": "uploaded",
"fileList": [
"docs-data.json"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\"))"
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\""
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\""
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")"
},
{
"columnName": "Zip code",
"expression": "\"zip_code\""
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", '\''T'\'', \"time\")))"
}
],
"target": {
"type": "table",
"tableName": "demo_table"
},
"createTableIfNotExists": true
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"source": {
"type": "uploaded",
"fileList": [
"docs-data.json"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\""
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\""
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")"
},
{
"columnName": "Zip code",
"expression": "\"zip_code\""
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": True
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\")))"
}
],
"target": {
"type": "table",
"tableName": "demo_table"
},
"createTableIfNotExists": True
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
In this example, Druid SQL functions perform the following transformations:
TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))
creates the__time
output column by concatenating thedate
andtime
input columns.UPPER(\"neighborhood\")
transforms the values of theneighborhood
input column into uppercase.MAX(\"rating\")
returns the maximum of rating numbers.TIMESTAMPADD(YEAR, 1, (TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))))
adds a year to the timestamp and returns a new timestamp in milliseconds.
Sample response
The following example shows a successful response:
Click to view the response
{
"source": {
"fileList": [
"docs-data.json"
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
],
"type": "uploaded"
},
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\"",
"isAggregation": null
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\"",
"isAggregation": null
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")",
"isAggregation": null
},
{
"columnName": "Zip code",
"expression": "\"zip_code\"",
"isAggregation": null
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\")))",
"isAggregation": null
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"query": "INSERT INTO \"demo_table\"\nSELECT\n TIME_PARSE(CONCAT(\"date\", 'T', \"time\")) AS \"__time\",\n \"restaurant\" AS \"Restaurant\",\n \"cuisine_type\" AS \"Cuisine type\",\n UPPER(\"neighborhood\") AS \"Neighborhood\",\n \"zip_code\" AS \"Zip code\",\n TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))) AS \"Next inspection\",\n MAX(\"rating\") AS \"Rating\",\n COUNT(*) AS \"__count\"\nFROM TABLE(\n POLARIS_SOURCE(\n '{\"fileList\":[\"docs-data.json\"],\"formatSettings\":{\"flattenSpec\":{},\"format\":\"nd-json\"},\"inputSchema\":[{\"dataType\":\"string\",\"name\":\"date\"},{\"dataType\":\"string\",\"name\":\"time\"},{\"dataType\":\"string\",\"name\":\"restaurant\"},{\"dataType\":\"string\",\"name\":\"cuisine_type\"},{\"dataType\":\"string\",\"name\":\"neighborhood\"},{\"dataType\":\"long\",\"name\":\"zip_code\"},{\"dataType\":\"long\",\"name\":\"rating\"}],\"type\":\"uploaded\"}'\n )\n)\n\nGROUP BY 1, 2, 3, 4, 5, 6\nPARTITIONED BY DAY",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-11T23:15:02.525488003Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a8686-e6fe-71a7-b0d8-617b1cb48a44",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-11T23:15:02.525488003Z",
"spec": {
"source": {
"fileList": [
"docs-data.json"
],
"formatSettings": {
"flattenSpec": {},
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
],
"type": "uploaded"
},
"target": {
"tableName": "demo_table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))",
"isAggregation": null
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\"",
"isAggregation": null
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\"",
"isAggregation": null
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")",
"isAggregation": null
},
{
"columnName": "Zip code",
"expression": "\"zip_code\"",
"isAggregation": null
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT(\"date\", 'T', \"time\")))",
"isAggregation": null
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "demo_table",
"type": "table",
"intervals": []
},
"type": "batch",
"completedTimestamp": null,
"startedTimestamp": null
}
Notice the query
property in the response body. Polaris returns the SQL equivalent of the ingestion job that you can use as the basis for creating SQL-based ingestion jobs.
Click to view the SQL equivalent
INSERT INTO "demo_table"
SELECT
TIME_PARSE(CONCAT("date", 'T', "time")) AS "__time",
"restaurant" AS "Restaurant",
"cuisine_type" AS "Cuisine type",
UPPER("neighborhood") AS "Neighborhood",
"zip_code" AS "Zip code",
TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT("date", 'T', "time"))) AS "Next inspection",
MAX("rating") AS "Rating",
COUNT(*) AS "__count"
FROM TABLE(
POLARIS_SOURCE(
'{"fileList":["docs-data.json"],"formatSettings":{"flattenSpec":{},"format":"nd-json"},"inputSchema":[{"dataType":"string","name":"date"},{"dataType":"string","name":"time"},{"dataType":"string","name":"restaurant"},{"dataType":"string","name":"cuisine_type"},{"dataType":"string","name":"neighborhood"},{"dataType":"long","name":"zip_code"},{"dataType":"long","name":"rating"}],"type":"uploaded"}'
)
)
GROUP BY 1, 2, 3, 4, 5, 6
PARTITIONED BY DAY
To track the job status or cancel the job, see View and manage ingestion jobs by API.
Example
The following Python example shows how to create an ingestion job and transform input data using the Jobs v1 API.
import os
import requests
import json
# Replace placeholders with your information
ORGANIZATION_NAME = "example"
REGION = "us-east-1"
CLOUD_PROVIDER = "aws"
PROJECT_ID = "12375ffx-f7x4-4f0x-a1a6-3b3424987ee0"
# Store the endpoint for Polaris Jobs API
url = f"https://{ORGANIZATION_NAME}.{REGION}.{CLOUD_PROVIDER}.api.imply.io/v1/projects/{PROJECT_ID}/jobs"
# Supply the API key in the following string variable.
# Do not supply your API key credentials in production scripts and
# do not check them into version control systems.
# See https://docs.imply.io/polaris/api-keys for more information.
apikey = os.getenv("POLARIS_API_KEY")
# Supply the dictionary that contains the request payload.
payload = {
"type": "batch",
"source": {
"type": "uploaded",
"fileList": [
"docs-data.json"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "time"
},
{
"dataType": "string",
"name": "restaurant"
},
{
"dataType": "string",
"name": "cuisine_type"
},
{
"dataType": "string",
"name": "neighborhood"
},
{
"dataType": "long",
"name": "zip_code"
},
{
"dataType": "long",
"name": "rating"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))"
},
{
"columnName": "Restaurant",
"expression": "\"restaurant\""
},
{
"columnName": "Cuisine type",
"expression": "\"cuisine_type\""
},
{
"columnName": "Neighborhood",
"expression": "UPPER(\"neighborhood\")"
},
{
"columnName": "Zip code",
"expression": "\"zip_code\""
},
{
"columnName": "Rating",
"expression": "MAX(\"rating\")",
"isAggregation": true
},
{
"columnName": "Next inspection",
"expression": "TIMESTAMPADD(YEAR, 1, (TIME_PARSE(CONCAT(\"date\", 'T', \"time\"))))"
}
],
"target": {
"type": "table",
"tableName": "python_test"
},
"createTableIfNotExists": True
}
def make_post(url, payload, key):
payload = json.dumps(payload)
headers = {
'Authorization': f'Basic {key}',
'Content-Type': 'application/json'
}
return requests.post(url, headers=headers, data=payload)
response = make_post(url, payload, apikey)
print(response.json())
Usage tips for input expressions
Note the following when you define input expressions in an ingestion job:
- Always quote your input fields to avoid syntax errors. For example,
"my_input_field"
. - Escape the double quotes with
\
for all input fields in expressions. For example,"expression":"\"my_input_field\""
. - Input expressions can only refer to fields present in the
inputSchema
defined in the job; in particular, they cannot refer to other expressions. - If your data source contains a
__time
input field, Polaris treats the__time
field as the primary timestamp. The__time
input field must be a long data type in milliseconds since Unix epoch format. For more information, see Timestamp expressions.
Learn more
See the following topics for more information:
- Jobs API documentation
- Create an ingestion job for more details on ingestion jobs.
- View and manage ingestion jobs by API for viewing details on an ingestion job.
- Push event data by API for pushing data from an application source into Polaris.
- Ingest data from files by API for ingesting uploaded files to a table in Polaris.
- Troubleshoot data ingestion for troubleshooting data ingestion.