Ingest batch data by API
After you create a table and upload files to Imply Polaris, you can ingest data into the table. You can launch, track, and manage ingestion jobs using the Jobs v2 API. This topic walks you through the process to ingest data from uploaded files into a table via the API. If you want to ingest streaming data instead, see Load event data.
Prerequisites
Before starting batch ingestion, you need the following:
- Files uploaded to Polaris. To upload files using the Files API, visit Upload files.
- The organization name and table name.
The examples in this topic apply to an aggregate table named
Koalas Subset
. - A table containing a schema. You can create a schema for an existing table using the UI or API.
The examples in this topic use the aggregate table
Koalas Subset
created in Create a table with a schema with the following schema:__time
: the primary timestampcity
: a string dimensionsession
: a string dimensionmax_session_length
: a long measure
- An API key with the
ManageIngestionJobs
role. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. See API key authentication to obtain an API key and assign permissions. Visit User roles reference for more information on roles and their permissions.
Load data into a table
Launch a batch ingestion job to import data from your uploaded files to a destination table in Polaris.
Your destination table must have a defined schema before ingestion.
If you submit an ingestion job to a table whose status is Setup incomplete, Polaris returns a 400 Bad Request
error.
See Table status reference for a list of available table statuses.
To launch an ingestion job, submit a POST
request to the Jobs v2 API and pass the job specification as a payload to the request.
The job specification is a JSON object that requires the following fields:
type: String representing the type of job. Set this property to
batch
for batch ingestion.target: Object describing the destination for ingested data. Within the
target
object, set thetype
totable
and specify the Polaris table name intableName
. For example:"target": {
"type": "table",
"tableName": "Koalas Subset"
},source: Object describing the source of input data. Within the
source
object, describe the type of source data, the files to ingest, the schema of the input data, and the data format settings. All the files listed in an ingestion job must have the same format, such as newline-delimited JSON. The following example shows asource
object for batch ingestion:"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},mappings: Object describing the relationship between the input fields of the source data and the columns of the Polaris table. See Map and transform data with input expressions for transformations you can apply to the input data. The following
mappings
example shows the following relationships:The
city
andsession
fields map directly to the table columns without transformation.The timestamp is parsed from the
timestamp
input field and mapped to__time
. See Timestamp for details on the input field requirements and expressions for time.The
MAX
aggregator is applied to thesession_length
input field, and the result is mapped tomax_session_length
."mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
If the table schema does not match the schema of the source data, Polaris ignores the unmapped source fields. Polaris stores null values for unmapped columns and when it cannot resolve data types—for example, when string fields in the source data are mapped to numeric columns in the table schema.
Sample request
The following example shows how to load data from kttm-2019-08-19.json.gz
and kttm-2019-08-20.json.gz
into Koalas Subset
:
- cURL
- Python
curl --location --request POST 'https://ORGANIZATION_NAME.api.imply.io/v2/jobs' \
-u ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a response to a successful ingestion job launch:
{
"type": "batch",
"id": "efb35e3e-406e-4127-ad2e-280fede4f431",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-08-09T22:34:46.716017658Z",
"lastUpdatedTimestamp": "2022-08-09T22:34:46.716017658Z",
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
]
}
Monitor ingestion job progress
To monitor the progress of your ingestion job, issue a GET
request to the Jobs v2 API with the job ID in the path.
For example, /v2/jobs/efb35e3e-406e-4127-ad2e-280fede4f431
.
Sample request
The following example shows how to monitor the progress of your ingestion job:
- cURL
- Python
curl --location --request GET 'https://ORGANIZATION_NAME.api.imply.io/v2/jobs/efb35e3e-406e-4127-ad2e-280fede4f431' \
-u ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw ''
import os
import requests
url = "https://ORGANIZATION_NAME.api.imply.io/v2/jobs/efb35e3e-406e-4127-ad2e-280fede4f431"
apikey = os.getenv("POLARIS_API_KEY")
payload = "\n"
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("GET", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response for ingestion job progress:
{
"type": "batch",
"id": "efb35e3e-406e-4127-ad2e-280fede4f431",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-08-09T22:34:46.716017658Z",
"lastUpdatedTimestamp": "2022-08-09T22:34:46.716017658Z",
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
]
}
Cancel an ingestion job
To cancel an ingestion job, issue a PUT
request to the Jobs v2 API with the job ID in the path.
For example, /v2/jobs/efb35e3e-406e-4127-ad2e-280fede4f431
.
Send a request body with the full output from the GET
request,
and set the desiredExecutionStatus
to canceled
.
Sample request
The following example shows how to cancel an ingestion job in Polaris:
- cURL
- Python
curl --location --request PUT 'https://ORGANIZATION_NAME.api.imply.io/v2/jobs/efb35e3e-406e-4127-ad2e-280fede4f431' \
-u ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"id": "efb35e3e-406e-4127-ad2e-280fede4f431",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"desiredExecutionStatus": "canceled",
"createdBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"executionStatus": "completed",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-08-09T22:34:47Z",
"lastUpdatedTimestamp": "2022-08-09T22:34:47Z",
"startedTimestamp": "2022-08-09T22:35:15Z",
"completedTimestamp": "2022-08-09T22:37:02Z",
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"report": {
"totals": {
"numRowsProcessed": 505432,
"numRowsProcessedWithWarning": 0,
"numRowsSkippedByFilter": 0,
"numRowsSkippedByError": 48
},
"logs": [
{
"timestamp": "2022-08-09T22:35:17.544Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 302571, Line: 302594)"
},
{
"timestamp": "2022-08-09T22:35:17.357Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 291398, Line: 291420)"
},
{
"timestamp": "2022-08-09T22:35:17.157Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 278620, Line: 278641)"
},
{
"timestamp": "2022-08-09T22:35:16.976Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 265099, Line: 265119)"
},
{
"timestamp": "2022-08-09T22:35:16.755Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 250880, Line: 250899)"
}
]
}
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.api.imply.io/v2/jobs/efb35e3e-406e-4127-ad2e-280fede4f431"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"id": "efb35e3e-406e-4127-ad2e-280fede4f431",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"desiredExecutionStatus": "canceled",
"createdBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "service-account-docs-demo",
"userId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"executionStatus": "completed",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-08-09T22:34:47Z",
"lastUpdatedTimestamp": "2022-08-09T22:34:47Z",
"startedTimestamp": "2022-08-09T22:35:15Z",
"completedTimestamp": "2022-08-09T22:37:02Z",
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-19.json.gz",
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"report": {
"totals": {
"numRowsProcessed": 505432,
"numRowsProcessedWithWarning": 0,
"numRowsSkippedByFilter": 0,
"numRowsSkippedByError": 48
},
"logs": [
{
"timestamp": "2022-08-09T22:35:17.544Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 302571, Line: 302594)"
},
{
"timestamp": "2022-08-09T22:35:17.357Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 291398, Line: 291420)"
},
{
"timestamp": "2022-08-09T22:35:17.157Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 278620, Line: 278641)"
},
{
"timestamp": "2022-08-09T22:35:16.976Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 265099, Line: 265119)"
},
{
"timestamp": "2022-08-09T22:35:16.755Z",
"healthStatus": "error",
"code": "CannotProcessRow",
"message": "Unable to parse row [] (Path: s3://imply-saas-files/89581a37-bd1a-411a-9d80-2efb67f9c7be/02c2bd29-32d1-433e-8200-c06c4848d9ac.gz, Record: 250880, Line: 250899)"
}
]
}
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("PUT", url, headers=headers, data=payload)
print(response.text)
Sample response
When you successfully cancel an ingestion job, the Jobs v2 API returns the 200 OK
status code and the details of the canceled job.
Learn more
See the following topics for more information:
- Jobs v2 API for reference on working with ingestion jobs in Polaris.
- Batch ingestion for strategies and concepts for batch ingestion.
- Load event data for ingesting streaming data into a table.
- Upload files for uploading files to Polaris using the API.