Ingest data from a table
You can use the Jobs v1 API to load data from an existing Polaris table into another table.
For a list of all ingestion options, see Ingestion sources overview.
Common uses for table-to-table ingestion include:
- Migrating data from a detail table to an aggregate table.
- Aging out data for a specific time range.
When you perform a table-to-table batch ingestion, Polaris takes a snapshot of the current data in the source table at the time the ingestion job is launched. If there is ongoing stream ingestion to the source table, incoming data is not ingested to the destination table.
Table-to-table ingestion supports the following modes:
append
: Appends data to a table. If the source and destination tables are the same table, duplicates the existing data within the table.replace
: Replaces data for the specified interval in the destination table.
During table-to-table ingestion, you cannot drop the source table or delete data from it.
The source table for the example in this topic is a detail table, Koalas Source
, with a schema that includes the following columns:
__time
: the primary timestampcity
: a string dimensionsession
: a string dimensionsession_length
: a long dimension
To follow along, you can create the table and load the data from kttm-source-table.json.tar.gz
.
The destination table is an aggregate table, Koalas Rollup
, with the following schema:
__time
: the primary timestampcity
: a string dimensioncount
: a long measure indicating the number of aggregated recordsmax_session_length
: a long measure
The destination table removes the session
field from the source table and adds a new count
column. Instead of storing each value in session_length
, the destination table only stores the maximum session_length
value for the aggregate table having rollup enabled.
Prerequisites
Before starting batch ingestion from a table, you need the following:
- A table in Polaris with your source data. You cannot ingest from a source table that has a status of
UNAVAILABLE
. - An API key with the
ManageIngestionJobs
permission. In the examples below, the key value is stored in the environment variable namedPOLARIS_API_KEY
.
For information about how to obtain an API key and assign permissions, see API key authentication.
For more information on permissions, visit Permissions reference.
You do not have to create the destination table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
The source table and the destination table can be the same table, but you cannot make changes to the underlying table schema in this case. For example, you can't drop a column. If you need to make schema changes, use a new table as the destination.
Load data from one table into another
Launch a batch ingestion job to migrate data from your source table to a destination table in Polaris.
To launch an ingestion job, submit a POST
request to the Jobs v1 API and pass the job specification as a payload to the request.
The job specification is a JSON object that requires the following fields:
type: String representing the type of job. Set this property to
batch
for batch ingestion.source: Object describing the source of input data. Within the
source
object, set thetype
totable
and thetableName
to the source table name. The following example shows asource
object for batch ingestion from a table:"source": {
"type": "table",
"tableName": "Koalas Source"
},You can optionally supply an ISO 8601-formatted
interval
to limit ingestion to source data within a specific time range.target: Object describing the destination table for ingested data. Within the
target
object, set thetype
totable
and specify the Polaris table name intableName
. For example:"target": {
"type": "table",
"tableName": "Koalas Rollup"
}The ingestion job spec in this example directs Polaris to automatically create the table if it doesn't exist since
createTableIfNotExists
is true.ingestionMode: Either
append
orreplace
. Defaults to append. If the source and destination tables are the same, anappend
job duplicates the data. Areplace
job overwrites the data for theinterval
specified in the target.If you are using
replace
ingestion mode, also includeintervals
in thetarget
property. Note thatinterval
in the source is singular, whereasintervals
in the target is plural. Specify one or more time intervals to replace in ISO 8601 format. For example:"target": {
"type": "table",
"tableName": "Koalas Rollup",
"intervals": ["2022-06-01T00:00:00Z/2022-06-01T08:00:00Z"]
}See Jobs v1 API for more detail on requirements for intervals.
mappings: Array describing the how the columns from the source table map to the columns in destination table. Enclose each input field within an expression in quotation marks. See Map and transform input fields for details and usage notes.
For table-to-table ingestion, you can omit the timestamp field mapping from source table. Polaris automatically detects the
__time
column.The following
mappings
example shows the following relationships:The
city
mapping demonstrates a simple uppercase transformation.The
max_session_length
demonstrates theMAX
aggregation function."mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")"
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
Sample request
The following example shows how to load data from Koalas Source
into Koalas Rollup
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
"type": "batch",
"source": {
"type": "table",
"tableName": "Koalas Source"
},
"target": {
"type": "table",
"tableName": "Koalas Rollup"
},
"createTableIfNotExists": true,
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")"
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"source": {
"type": "table",
"tableName": "Koalas Source"
},
"target": {
"type": "table",
"tableName": "Koalas Rollup"
},
"createTableIfNotExists": True,
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")"
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a response to a successful ingestion job launch:
Click to view the response
{
"source": {
"interval": "-1000-01-01T00:00:00.000Z/5000-01-01T00:00:00.000Z",
"tableName": "Koalas Source",
"type": "table"
},
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")",
"isAggregation": null
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"query": "INSERT INTO \"Koalas Rollup\"\nSELECT\n \"__time\" AS \"__time\",\n UPPER(\"city\") AS \"city\",\n MAX(\"session_length\") AS \"max_session_length\",\n COUNT(*) AS \"__count\"\nFROM \"Koalas Source\"\nWHERE (TIME_IN_INTERVAL(\"__time\", '0/5000-01-01T00:00:00.000Z'))\nGROUP BY 1, 2\nPARTITIONED BY DAY",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-13T22:31:47.800252712Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a90ac-0758-762b-b502-c5b1293302fe",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-13T22:31:47.800252712Z",
"spec": {
"source": {
"interval": "-1000-01-01T00:00:00.000Z/5000-01-01T00:00:00.000Z",
"tableName": "Koalas Source",
"type": "table"
},
"target": {
"tableName": "Koalas Rollup",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")",
"isAggregation": null
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "Koalas Rollup",
"type": "table",
"intervals": []
},
"type": "batch",
"completedTimestamp": null,
"startedTimestamp": null
}
Learn more
See the following topics for more information:
- Jobs v1 API for reference on working with ingestion jobs in Polaris.
- Ingest data from files by API for information on monitoring or cancelling a batch job.