Skip to main content

Create an ingestion job

In Imply Polaris, you can launch an ingestion job to load data into a table. Within your ingestion job, select the source of data, specify the destination table, and map the input fields from the source to the table columns. You can use the Polaris UI or the Jobs v1 API to launch and manage ingestion jobs. If the input schema is different from the table schema, you can transform input data while mapping input fields to table columns during ingestion.

This topic covers the basics of ingestion job creation and the options for transforming source data in the Polaris UI. For information on launching ingestion jobs programmatically, see Create an ingestion job by API. To use SQL to define your batch ingestion job, see Ingest using SQL. See Manage ingestion jobs for details on viewing, canceling, and recreating ingestion jobs.

Prerequisites

Users with the ManageIngestionJobs permission and members of the Organization Admin, Project Admin, or Data Manager groups can create ingestion jobs. For information on permissions, see Permissions reference.

Ingest data

To launch an ingestion job from the Polaris console, do the following:

  1. If you haven't already, create a new data source. See Ingestion sources for information on the available sources and their requirements. See Create a connection for information on how to create a connection.

  2. From the Jobs tab, select Create job > Insert data. Select the table to ingest data into. If the table doesn’t exist, click New table and provide the name of the table. Polaris determines the appropriate table type and schema mode from the ingestion job. For more details, see Create a table.

  3. Select the source of data for ingestion, then click Next.

    Select data source

  4. Polaris parses your data and displays a sample output. You can correct data types, add missing input fields, or remove unnecessary fields. You can configure how Polaris parses delimiter-separated or nested data in this stage. Then click Continue.

    Parse data

  5. Next, map your input fields to your table columns.

    1. Use the column editor to change column names and data types and map and transform input data.

      For automatically created tables, the mappings determine the table type assigned to the table. If the mappings include an aggregation function, Polaris creates an aggregate table. Otherwise, Polaris creates a detail table. Since the table type cannot be changed after ingestion, confirm the type at the first ingestion job. If you do not have any aggregation functions but want an aggregate table, create the table before the ingestion job.

    2. To set the rollup granularity for an aggregate table, click the primary timestamp column and select Edit.

    3. In the menu bar above the column editor, you can also:

    Map source to table

  6. Click Start ingestion to launch the ingestion job.

To learn how to stop an ingestion job, see Cancel an ingestion job.

Map and transform data with input expressions

If your input schema is different from your table schema, you can transform the input fields during ingestion using mappings. Mappings describe the relationship between the input fields and the output columns of a table's schema. A mapping takes the name of the output column as well as an input expression.

An input expression in Polaris is an Apache Druid SQL expression that describes how to compute the value of a table's column from a set of input fields. An input expression can accept a simple value, such as an input field, or a combination of input fields and SQL operations. Input expressions can only refer to fields present in the actual input rows; in particular, they cannot refer to other expressions.

The simplest expression is the identity mapping, in which the input field is ingested without transformation into a dimension column. In this case, the expression is just the name of the input fieldfor example, "inputField".

In streaming ingestion jobs into flexible tables, you may apply schema auto-discovery on ingestion jobs. With this feature, Polaris automatically discovers the input fields and maps them to table dimensions using the identity mapping. Otherwise, you must list the input field and mapping for each field you want to ingest. When schema auto-discovery is not enabled, Polaris does not ingest unmapped source fields even if there’s a field in the input schema with the same name as a table column.

Functions for input expressions

The following resources list the operations supported in input expressions for dimensions:

If the output column is a measure, the input expression must use one of the following aggregation functions:

  • COUNT(*): Counts the number of rows of expr.
  • SUM(expr): Sums numbers.
  • MIN(expr): Takes the minimum of numbers.
  • MAX(expr): Takes the maximum of numbers.
  • Functions to measure dispersion:
    • VARIANCE(expr) or VAR_SAMP(expr): Computes the sample variance of numbers.
    • VAR_POP(expr): Computes the population variance of numbers.
    • STDDEV(expr) or STDDEV_SAMP(expr): Computes the sample standard deviation of numbers.
    • STDDEV_POP(expr): Computes the population standard deviation of numbers.
  • Functions to estimate distinct counts:
    • COUNT(DISTINCT expr): Counts the number of distinct values of expr. In Polaris, this is the same as APPROX_COUNT_DISTINCT and APPROX_COUNT_DISTINCT_DS_HLL. Polaris uses the HLL approximation algorithm to estimate distinct values. To compute distinct count estimates from sketches, see Query sketched data.
    • APPROX_COUNT_DISTINCT_DS_THETA: Counts the number of distinct count values using Theta sketches. Theta sketches require more memory than HLL sketches.
  • Sketch creation functions. For more information, see Cardinality sketches and Quantiles sketches.
    • DS_HLL(expr, [lgK, tgtHllType]): Creates an HLL sketch on the values of the expression.
    • DS_THETA(expr, [size]): Creates a Theta sketch on the values of the expression.
    • DS_QUANTILES_SKETCH(expr, [k]): Creates a Quantiles sketch on the values of the expression.
  • Functions to aggregate based on time. For more information, see Aggregate value on ingestion based on earliest or latest time.
    • EARLIEST_BY(expr, timestampExpr, maxBytesPerString): For the earliest time value in timestampExpr, returns the associated value of a string or numeric expression expr.
    • LATEST_BY(expr, timestampExpr, maxBytesPerString): For the latest time value in timestampExpr, returns the associated value of a string or numeric expression expr.

For more information, see Druid SQL aggregation functions.

Reference time in an input expression

Polaris always stores the primary timestamp in the __time column of your table. Use an input expression to parse, and optionally transform, your input time field to map to the __time column. You must always include a mapping for __time and declare the input fields on which the mapping depends. For more information and examples, see Timestamp expressions.

info

If the source data has null or unparseable timestamps, Polaris fails the ingestion job. Either assign a default timestamp using an input expression, or filter out null timestamps using an ingestion filter.

Usage notes for input expressions

Consider the following implementation details when mapping your input fields to columns in a Polaris table:

  • Enclose each input field within an expression in quotation marks, such as "inputField1" + "inputField2".
  • Polaris stores null values when data types cannot be resolved. For example, when ingesting the string "myString" into a numeric column.
  • To ingest multi-value dimensions into aggregate tables, ensure the data is in a string-typed field and use the input expression MV_TO_ARRAY(). For example, MV_TO_ARRAY("mvdField"). Multi-value dimensions for detail tables do not require an input expression.

Ingestion example

The following example shows how to map input fields to output columns with input expressions while transforming some of the source data.

Before you start, you must:

  • Download the docs-data.json sample dataset.
  • Upload the docs-data.json to the staging area in Polaris.

Before ingestion, make the following modifications in the map schema view:

  1. Hold the pointer over the __time column, then click Edit. Concatenate the date and time input fields and parse the expression in ISO 8601 format using this input expression:

    TIME_PARSE(CONCAT("date", 'T', "time"))

    Column edit

  2. Hold the pointer over the time column, then click Edit > Remove.

  3. Hold the pointer over the neighborhood input field, then click Edit. Convert the lowercase data to uppercase with this input expression:

    UPPER("neighborhood")
  4. Click Add column to add a new dimension column. Name the column next_inspection and enter the following input expression:

    TIMESTAMPADD(YEAR, 1, TIME_PARSE(CONCAT("date", 'T', "time")))

    Add dimension column

    Click Add. The output adds a year to the primary timestamp and returns a new timestamp in milliseconds.

  5. Click Start ingestion to launch the ingestion job.

  6. Once the job is complete, inspect the table. Note how the __time column is constructed from the date and time input fields, as well as the new next_inspection column which was not present in the original dataset.

    The following screenshot shows the ingested table:

    Ingested table

Learn more

See the following topics for more information: