Skip to main content

Ingest data from a table

You can use the Jobs v1 API to load data from an existing Polaris table into another table.

For a list of all ingestion options, see Ingestion sources overview.

Common uses for table-to-table ingestion include:

  • Migrating data from a detail table to an aggregate table.
  • Aging out data for a specific time range.

When you perform a table-to-table batch ingestion, Polaris takes a snapshot of the current data in the source table at the time the ingestion job is launched. If there is ongoing stream ingestion to the source table, incoming data is not ingested to the destination table.

Table-to-table ingestion supports the following modes:

  • append: Appends data to a table. If the source and destination tables are the same table, duplicates the existing data within the table.
  • replace: Replaces data for the specified interval in the destination table.

During table-to-table ingestion, you cannot drop the source table or delete data from it.

The source table for the example in this topic is a detail table, Koalas Source, with a schema that includes the following columns:

  • __time: the primary timestamp
  • city: a string dimension
  • session: a string dimension
  • session_length: a long dimension

To follow along, you can create the table and load the data from kttm-source-table.json.tar.gz.
The destination table is an aggregate table, Koalas Rollup, with the following schema:

  • __time: the primary timestamp
  • city: a string dimension
  • count: a long measure indicating the number of aggregated records
  • max_session_length: a long measure

The destination table removes the session field from the source table and adds a new count column. Instead of storing each value in session_length, the destination table only stores the maximum session_length value for the aggregate table having rollup enabled.

Prerequisites

Before starting batch ingestion from a table, you need the following:

  • A table in Polaris with your source data. You cannot ingest from a source table that has a status of UNAVAILABLE.
  • An API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the environment variable named POLARIS_API_KEY.
    For information about how to obtain an API key and assign permissions, see API key authentication.
    For more information on permissions, visit Permissions reference.

You do not have to create the destination table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

The source table and the destination table can be the same table, but you cannot make changes to the underlying table schema in this case. For example, you can't drop a column. If you need to make schema changes, use a new table as the destination.

Load data from one table into another

Launch a batch ingestion job to migrate data from your source table to a destination table in Polaris.

To launch an ingestion job, submit a POST request to the Jobs v1 API and pass the job specification as a payload to the request. The job specification is a JSON object that requires the following fields:

  • type: String representing the type of job. Set this property to batch for batch ingestion.

  • source: Object describing the source of input data. Within the source object, set the type to table and the tableName to the source table name. The following example shows a source object for batch ingestion from a table:

    "source": {
    "type": "table",
    "tableName": "Koalas Source"
    }

    You can optionally supply an ISO 8601-formatted interval to limit ingestion to source data within a specific time range.

  • target: Object describing the destination table for ingested data. Within the target object, set the type to table and specify the Polaris table name in tableName. For example:

    "target": {
    "type": "table",
    "tableName": "Koalas Rollup"
    }

    The ingestion job spec in this example directs Polaris to automatically create the table if it doesn't exist since createTableIfNotExists is true.

  • ingestionMode: Either append or replace. Defaults to append. If the source and destination tables are the same, an append job duplicates the data. A replace job overwrites the data for the interval specified in the target.

    If you are using replace ingestion mode, also include intervals in the target property. Note that interval in the source is singular, whereas intervals in the target is plural. Specify one or more time intervals to replace in ISO 8601 format. For example:

     "target": {
    "type": "table",
    "tableName": "Koalas Rollup",
    "intervals": ["2022-06-01T00:00:00Z/2022-06-01T08:00:00Z"]
    }

    See Jobs v1 API for more detail on requirements for intervals.

  • mappings: Array describing the how the columns from the source table map to the columns in destination table. Enclose each input field within an expression in quotation marks. See Map and transform input fields for details and usage notes.

    For table-to-table ingestion, you can omit the timestamp field mapping from source table. Polaris automatically detects the __time column.

    The following mappings example shows the following relationships:

    • The city mapping demonstrates a simple uppercase transformation.

    • The max_session_length demonstrates the MAX aggregation function.

       "mappings": [
      {
      "columnName": "city",
      "expression": "UPPER(\"city\")"
      },
      {
      "columnName": "max_session_length",
      "expression": "MAX(\"session_length\")",
      "isAggregation": true
      }
      ]

Sample request

The following example shows how to load data from Koalas Source into Koalas Rollup:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
"type": "batch",
"source": {
"type": "table",
"tableName": "Koalas Source"
},
"target": {
"type": "table",
"tableName": "Koalas Rollup"
},
"createTableIfNotExists": true,
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")"
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

The following example shows a response to a successful ingestion job launch:

Click to view the response
{
"source": {
"interval": "-1000-01-01T00:00:00.000Z/5000-01-01T00:00:00.000Z",
"tableName": "Koalas Source",
"type": "table"
},
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")",
"isAggregation": null
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"query": "INSERT INTO \"Koalas Rollup\"\nSELECT\n \"__time\" AS \"__time\",\n UPPER(\"city\") AS \"city\",\n MAX(\"session_length\") AS \"max_session_length\",\n COUNT(*) AS \"__count\"\nFROM \"Koalas Source\"\nWHERE (TIME_IN_INTERVAL(\"__time\", '0/5000-01-01T00:00:00.000Z'))\nGROUP BY 1, 2\nPARTITIONED BY DAY",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-09-13T22:31:47.800252712Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "018a90ac-0758-762b-b502-c5b1293302fe",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-09-13T22:31:47.800252712Z",
"spec": {
"source": {
"interval": "-1000-01-01T00:00:00.000Z/5000-01-01T00:00:00.000Z",
"tableName": "Koalas Source",
"type": "table"
},
"target": {
"tableName": "Koalas Rollup",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "city",
"expression": "UPPER(\"city\")",
"isAggregation": null
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "Koalas Rollup",
"type": "table",
"intervals": []
},
"type": "batch",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

See the following topics for more information: