Skip to main content

SQL ingestion reference

This topic provides a reference of syntax to use for SQL-based ingestion jobs. For information on requirements and how to submit an ingestion job using SQL, see Ingest using SQL.

General syntax

A query for SQL-based ingestion typically looks like the following:

INSERT INTO "your_table"
SELECT
TIME_FLOOR(TIME_PARSE("TIMESTAMP_COLUMN"), 'PT1H') AS "__time",
"column1",
"column2",
SUM("column3") AS "sum_column3"
FROM TABLE(
FUNCTION(
...
)
)
GROUP BY 1, 2, 3
PARTITIONED BY UNIT_OF_TIME
CLUSTERED BY "column1"

INSERT|REPLACE INTO your_table

Use INSERT to insert new data. Use REPLACE to overwrite existing data.

When using REPLACE, specify the time interval for data in the table you want to overwrite. This can either be OVERWRITE ALL or bound by timestamps with OVERWRITE WHERE:

OVERWRITE WHERE "__time" >= TIMESTAMP '2019-08-25 02:00:00' AND "__time" < TIMESTAMP '2019-08-25 03:00:00'

When replacing data over a specific time interval, keep the following points in mind:

  • The destination time interval in OVERWRITE WHERE is automatically added to filter your source data.
  • The time interval must align with the time partitioning set in PARTITIONED BY.

For more information, see the following:

Creating aggregate tables with INSERT or REPLACE INTO

If you are creating a new table with SQL INSERT or REPLACE into with createTableIfNotExists: true, Polaris includes a __count column in the schema, but leaves the values as NULL.

If you want to populate the __count column, include it in your SELECT statement. For example:

REPLACE INTO "koalas_with_rollup" OVERWRITE ALL
SELECT
FLOOR(TIME_PARSE("timestamp") TO HOUR) AS "__time",
"city" AS "city",
SUM("session_length") AS "session_length",
COUNT(*) AS "__count"
FROM TABLE(
POLARIS_SOURCE('{
"fileList":["kttm-2019-08-19.json.gz", "kttm-2019-08-20.json.gz"],
"inputSchema":[
{"dataType":"string","name":"timestamp"},
{"dataType":"string","name":"city"},
{"dataType":"long","name":"session_length"} ],
"formatSettings":{"format":"nd-json"},
"type":"uploaded"}'
)
)
GROUP BY 1,2
PARTITIONED BY DAY

If you don't need the __count column, you can undeclare it in the table schema after table creation. Polaris subsequently removes the column.

SELECT

The columns you want to ingest. Like other ingestion methods, you need to specify a column to explicitly act as your time column.

Specify any input expressions to transform your data in the SELECT statement. For more information about input expressions, see Map and transform your data with input expressions.

FROM TABLE(FUNCTION())

You can use the following functions within the TABLE() function in your SQL statement to specify the source of your input data: Polaris supports the following table functions:

  • EXTERN: For inline data provided directly in the SQL statement. In contrast to using EXTERN in Apache Druid, Polaris does not support reading external data using EXTERN.
  • POLARIS_UPLOADED: For files uploaded to Polaris. Only supports JSON or delimiter-separated data.
  • POLARIS_S3_CONNECTION: For S3 connections specifically. Only supports JSON or delimiter-separated data.
  • POLARIS_SOURCE: For connections or uploaded files. Supports all allowable source formats.

You do not use a table function when your source data is another Polaris table.

GROUP BY

Optional: Provide a GROUP BY clause to use rollup to pre-aggregate data during ingestion. This reduces storage footprint and improves performance.

The columns you provide in the GROUP BY clause become dimensions, and aggregation functions become metrics. Intermediate aggregations are stored in the generated segments, and further aggregation is done at query time.

info

To apply rollup beyond millisecond granularity, you must also wrap the __time mapping in the TIME_FLOOR function within the SELECT clause. Specify your rollup granularity as an ISO 8601-formatted duration in TIME_FLOOR. For example:

TIME_FLOOR(TIME_PARSE("timestamp"), 'PT1H') AS "__time"

You can also set the time zone and origin from which the period boundaries are evaluated in the job. For example:

TIME_FLOOR(TIME_PARSE("timestamp"), 'PT1H', TIMESTAMP '1970-01-01T00:00:00Z', 'America/Los_Angeles') AS "__time"

For more information, see the following:

PARTITIONED BY

Optional: Defines how to partition the data in the table based on time. Defaults to DAY.

Time partitioning is a table-specific property you set when you create or update a table. If the job property doesn't match the table property, the PARTITIONED BY value overrides the table's property for the job only.

For more information, see the following:

CLUSTERED BY

Optional: Further split data by specifying one or more columns to cluster data by.

The list of columns used for clustering is a table-specific property you set when you create or update a table. The CLUSTERED BY value overrides the table's property if they do not match.

For more information, see the following:

Table functions

Table functions are used in SQL statements to define the source of input data used in the ingestion job. You can only specify batch sources for SQL-based ingestion.

In the source data specification, provide the schema of the input data using the EXTEND clause. Express the columns using SQL CREATE TABLE syntax: column1 dataType, column2 dataType, ....

For example:

INSERT INTO "example_table"
SELECT
...
FROM TABLE(
POLARIS_UPLOADED(...)
) EXTEND("channel" VARCHAR, "isRobot" VARCHAR, "timestamp" VARCHAR)
PARTITIONED BY DAY

For the POLARIS_SOURCE table function, you specify the input schema in the inputSchema array of the ingestion job spec. POLARIS_SOURCE doesn't support the EXTEND clause.

Don't provide an input schema when ingesting from an existing table.

EXTERN

info

In contrast to using EXTERN in Apache Druid, Polaris does not support reading external data using EXTERN.

The EXTERN table function allows you to provide data to ingest directly in the table function. If you want to skip the details, check out the example.

EXTERN uses the following syntax:

EXTERN(
'<Druid input source>',
'<Druid input format>',
'<row signature>'
)

In EXTERN, you specify the following:

  1. An inline input source as a JSON-encoded string, such as '{"type":"inline","data":"{\"timestamp\": 1681794225551}\n{\"timestamp\": 1681794225558}"}'
  2. The input format as a JSON-encoded string, such as '{"type":"json"}'
  3. The row signature as a JSON-encoded array of column descriptors. Each column descriptor must have a name and a type. The type can be string, long, double, or float.

For example:

FROM TABLE(
EXTERN(
'{"type":"inline","data":"{\"timestamp\": 1681794225551}\n{\"timestamp\": 1681794225558}"}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"commentLength","type":"long"},{"name":"isNew","type":"string"}]'
)
)

For more information, see EXTERN reference.

POLARIS_UPLOADED

To ingest data from files uploaded to Polaris, use the POLARIS_UPLOADED table function. For information on uploading files, see Upload a file and view sample data. If you want to skip the details, check out the example.

POLARIS_UPLOADED uses the following syntax:

POLARIS_UPLOADED(
files => ARRAY['example-file1', 'example-file2'],
format => 'example-format'
)

POLARIS_UPLOADED requires the following parameters:

  • files: An array of the files to ingest. The files must already be uploaded to Polaris.

  • format: Format of the data to ingest. All files for an ingestion job must have the same format. Supported values: json, csv, tsv

    To ingest data in other supported formats, use the POLARIS_SOURCE table function. For an example, see Generic source.

    For delimiter-separated format using a custom delimiter, set format => 'tsv', and define the delimiter as an additional parameter in POLARIS_UPLOADED.

    With the csv or tsv formats, you can also specify skipHeaderRows to designate the number or initial rows to skip and listDelimiter to set the custom delimiter for multi-value dimensions.

The following example shows the syntax for delimiter-separated data:

POLARIS_UPLOADED(
files => ARRAY['file01.tsv', 'file02.tsv'],
format => 'tsv',
delimiter => '|',
skipHeaderRows => 1,
listDelimiter => ';'
)

POLARIS_S3_CONNECTION

To ingest data from an S3 connection, use the POLARIS_S3_CONNECTION table function. For information about creating an S3 connection, see Connect to S3. If you want to skip the details, check out the example.

POLARIS_S3_CONNECTION uses the following syntax:

POLARIS_S3_CONNECTION(
connectionName => 'example-connection',
[ uris => ARRAY['example-uris'], ]
[ prefixes => ARRAY['example-prefixes'], ]
[ objects => ARRAY['example-objects'], ]
[ pattern => 'example-pattern', ]
format => 'example-format'
)

In POLARIS_S3_CONNECTION, you specify the following:

  • connectionName: Name of the S3 connection. You must create a connection before you can use it in a SQL-based ingestion job. For reference on creating an S3 connection, see Connect to S3.

  • One of the following descriptors, designating which objects to ingest from the S3 bucket:

    • uris
    • prefixes
    • objects
    • pattern

    For more details and examples, see Ingest data from Amazon S3 by API.

  • format: Format of the data to ingest. All files for an ingestion job must have the same format. Supported values: json, csv, tsv

    To ingest data in other supported formats, use the POLARIS_SOURCE table function. For an example, see Generic source.

POLARIS_SOURCE

POLARIS_SOURCE is a general version of specifying your source data for an ingestion job. POLARIS_SOURCE accepts any batch ingestion source that Polaris supports. Use this function when your source data is in a format other than newline-delimited JSON or delimiter-separated values, such as CSV or TSV. If you want to skip the details, check out the example.

POLARIS_SOURCE uses the following syntax:

POLARIS_SOURCE(
source => 'example-job-source'
)

For the source parameter, supply a JSON object that describes the source of data for the ingestion job. The JSON object is described by the same fields for source in an ingestion job spec.

The following example shows an example for source:

{
"type": "uploaded",
"fileList": ["example-file.json"],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "channel"
}
]
}

You can convert the source object into a single line format to use in the SQL statement. For example, if inputSource.json contains the preceding example, convert the JSON into a one line string using a command like the following:

cat inputSource.json | jq -c

Examples based on data source

This section presents examples of SQL statements from various sources of data for batch ingestion.

A table

You do not need a table function when your source data comes from an existing table in Polaris.

The following example shows how to ingest from example_table1 to example_table2:

INSERT INTO "example_table2"
SELECT *
FROM "example_table1"
PARTITIONED BY DAY

Inline data

Use the EXTERN table function to ingest from inline data provided in the SQL statement. In contrast to using EXTERN in Apache Druid, Polaris does not support reading external data using EXTERN.

Consider the following newline-delimited JSON data:

{"timestamp": 1681794225551, "str_dim": "day1", "double_measure1": 5, "double_measure2": 15.5}
{"timestamp": 1681794225558, "str_dim": "day1", "double_measure1": 5, "double_measure2": 15.5}

The row signature contains the following column descriptors:

[
{
"name": "timestamp",
"type": "long"
},
{
"name": "str_dim",
"type": "string"
},
{
"name": "double_measure1",
"type": "double"
},
{
"name": "double_measure2",
"type": "double"
}
]

The following example shows how to ingest from JSON data provided inline in the SQL statement:

INSERT INTO "example_table"
WITH "inline_data" AS (
SELECT * FROM TABLE(
EXTERN(
'{"type":"inline", "data":"{\"timestamp\": 1681794225551, \"str_dim\": \"day1\", \"double_measure1\": 5, \"double_measure2\": 15.5}\n{\"timestamp\": 1681794225558, \"str_dim\": \"day1\", \"double_measure1\": 5, \"double_measure2\": 15.5}"}',
'{"type":"json"}',
'[{"name":"timestamp","type":"long"},{"name":"str_dim","type":"string"},{"name":"double_measure1","type":"double"},{"name":"double_measure2","type":"double"}]'
)
))

SELECT
MILLIS_TO_TIMESTAMP("timestamp") as "__time",
"str_dim",
"double_measure1",
"double_measure2"
FROM "inline_data"
PARTITIONED BY DAY

To submit the preceding SQL query by API, escape the quotation marks to ensure the request body is in valid JSON format, such as follows:

{
"type": "sql",
"query": "INSERT INTO example_table WITH inline_data AS (SELECT * FROM TABLE(EXTERN('{\"type\":\"inline\", \"data\":\"{\"timestamp\": 1681794225551, \"str_dim\": \"day1\", \"double_measure1\": 5, \"double_measure2\": 15.5}\n{\"timestamp\": 1681794225558, \"str_dim\": \"day1\", \"double_measure1\": 5, \"double_measure2\": 15.5}\"}', '{\"type\":\"json\"}', '[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"str_dim\",\"type\":\"string\"},{\"name\":\"double_measure1\",\"type\":\"double\"},{\"name\":\"double_measure2\",\"type\":\"double\"}]'))) SELECT MILLIS_TO_TIMESTAMP(\"timestamp\") as __time, str_dim, double_measure1, double_measure2 FROM inline_data PARTITIONED BY DAY"
}

Uploaded files

The following example shows how to use the POLARIS_UPLOADED table function to ingest data from uploaded files:

INSERT INTO "example_table"
SELECT
TIME_PARSE("timestamp") AS "__time",
*
FROM TABLE(
POLARIS_UPLOADED(
files => ARRAY['wikipedia.json.gz'],
format => 'json'
)
) EXTEND("channel" VARCHAR, "isRobot" VARCHAR, "timestamp" VARCHAR)
PARTITIONED BY DAY

If your uploaded data is in a format other than JSON or delimiter-separated values, use POLARIS_SOURCE instead.

An S3 connection

The following example shows how to use the POLARIS_S3_CONNECTION table function to ingest data from an S3 connection:

INSERT INTO "example_table"
SELECT
TIME_PARSE("timestamp") AS "__time",
"page",
"namespace"
FROM TABLE(
POLARIS_S3_CONNECTION(
connectionName => 'example-s3-connection',
uris => ARRAY['s3://foo/bar/file.json'],
format => 'json'
)
) EXTEND(page VARCHAR, namespace VARCHAR, "timestamp" VARCHAR)
PARTITIONED BY HOUR
CLUSTERED BY "namespace"

For details and more examples of selecting files from an S3 connection, see the following:

If your uploaded data is in a format other than JSON or delimiter-separated values, use POLARIS_SOURCE instead.

Generic source

The following example shows how to use the POLARIS_SOURCE table function to ingest data from an S3 connection:

INSERT INTO "example_table"
SELECT
TIME_PARSE("timestamp") AS "__time",
*
FROM TABLE(
POLARIS_SOURCE(
'{"type": "s3", "connectionName": "my_s3_connection", "pattern": "files/**.json", "formatSettings": {"format": "nd-json"}, "inputSchema":[{"dataType":"string","name":"timestamp"},{"dataType":"string","name":"channel"}]}'
)
)
PARTITIONED BY HOUR

You can also specify format settings, such as data in ORC or Avro OCF format. The following example uses POLARIS_SOURCE to ingest data from uploaded files in ORC format:

INSERT INTO "example_table"
SELECT
TIME_PARSE("timestamp") AS "__time",
*
FROM TABLE(
POLARIS_SOURCE(
'{"type": "uploaded", "fileList": ["example-file.orc"], "formatSettings": {"format": "orc", "binaryAsString": false}, "inputSchema":[{"dataType":"string","name":"timestamp"},{"dataType":"string","name":"channel"}]}'
)
)
PARTITIONED BY HOUR

Context parameters

With SQL-based ingestion jobs, you can set context parameters to configure query planning. For the available context parameters, see the following topics:

Polaris updates the default value from Apache Druid for the following context parameters:

  • durableShuffleStorage: true.
  • faultTolerance: true.
  • maxNumTasks: 75. Polaris restricts the allowable values from 2 to 75.

Polaris restricts the following context parameters:

Context parameterSupported valueHow Polaris handles unsupported values
finalizeAggregationstrue for detail tables
false for aggregate tables
Raise an error in the ingestion job.
groupByEnableMultiValueUnnestingfalseOverride the value without failing.
rowsInMemoryNot supportedIgnore the context parameter.
taskAssignmentautoOverride the value without failing.
More info about context parameters

Polaris manages context parameters for you in a way that fits most use cases. You don't need to use them unless you have a use case in mind that may benefit from changing Polaris's out-of-the-box behavior.

For example, the rowsPerSegment context parameter helps determine how many rows are in a single segment. It's not a hard rule and only serves as a general guideline.
Polaris can generate segments smaller or larger than the value you set based on other factors. Polaris uses a default that fits most scenarios. For certain use cases though, more or fewer rows for a segment can improve performance. Evaluate your segment details to determine if you need larger segments.

Limitations

  • With SQL-based ingestion for Apache Druid, INSERT statements can create a new datasource for you. In Polaris, you must create a table before you can load data into it using INSERT statements.
  • Polaris restricts certain context parameters. See Context parameters for more information.
  • The UI does not support multiple sources for an ingestion job. However, you can specify multiple sources that you combine using the JOIN operator in your SQL statement. For an example, see INSERT with JOIN.
  • For other limitations of SQL-based ingestion, see SQL-based ingestion known issues.

Learn more

See the following topics for more information: