Reference
The Multi-Stage Query (MSQ) Framework is a preview feature available starting in Imply 2022.06. Preview features enable early adopters to benefit from new functionality while providing ongoing feedback to help shape and evolve the feature. All functionality documented on this page is subject to change or removal in future releases. Preview features are provided "as is" and are not subject to Imply SLAs.
Context parameters
In addition to the Druid SQL context parameters, Multi-Stage Query (MSQ) supports context parameters that are specific to it.
You provide context parameters alongside your queries to customize the behavior of the query. The way you provide the parameters depends on how you submit your query:
Druid console: Add your parameters before your query like so:
--:context <key>: <value> --:context msqFinalizeAggregations: false INSERT INTO myTable ...
API: Add them in a
context
section of your JSON object like so:{ "query": "SELECT 1 + 1", "context": { "<key>": <value>, "msqMaxNumTasks": 3 } }
Parameter | Description | Default value |
---|---|---|
msqMaxNumTasks | (SELECT, INSERT, or REPLACE) MSQ executes queries using the indexing service, that is using the Overlord + MiddleManager. This property specifies the maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query will return a TaskStartTimeout error code after about 10 minutes.May also be provided as msqNumTasks . If both are present, msqMaxNumTasks takes priority. | 2 |
msqTaskAssignment | (SELECT, INSERT, or REPLACE) Determines how the number of tasks is chosen. max : use as many tasks as possible, up to the maximum msqMaxNumTasks .auto : use as few tasks as possible without exceeding 10 GiB or 10,000 files per task. Please review the limitations of auto mode before using it. | max |
msqFinalizeAggregations | (SELECT, INSERT, or REPLACE) Whether Druid will finalize the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see SQL aggregation functions. | true |
msqRowsInMemory | (INSERT or REPLACE) Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, you should stick to the default. It may be necessary to override this if you run into one of the current known issues around memory usage. | 100,000 |
msqSegmentSortOrder | (INSERT or REPLACE) Normally, Druid sorts rows in individual segments using __time first, then the CLUSTERED BY clause. When msqSegmentSortOrder is set, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.The column list can be provided as comma-separated values or as a JSON array in string form. If your query includes __time , then this list must begin with __time .For example: consider an INSERT query that uses CLUSTERED BY country and has msqSegmentSortOrder set to __time,city . Within each time chunk, Druid assigns rows to segments based on country , and then within each of those segments, Druid sorts those rows by __time first, then city , then country . | empty list |
maxParseExceptions | (SELECT, INSERT, or REPLACE) Maximum number of parse exceptions that are ignored while executing the query before it stops with TooManyWarningsFault . To ignore all the parse exceptions, set the value to -1. | 0 |
msqRowsPerSegment | (INSERT or REPLACE) Number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, you should stick to the default. For general information about sizing rows per segment, see Segment Size Optimization. | 3,000,000 |
msqDurableShuffleStorage | (SELECT, INSERT, or REPLACE) Whether to use durable storage for shuffle mesh. To use this feature, durable storage must be configured at the server level using druid.msq.intermediate.storage.enable=true and its associated properties. If these properties are not configured, any query with the context variable msqDurableShuffleStorage=true fails with a configuration error. | false |
sqlTimeZone | Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00". | druid.sql.planner.sqlTimeZone on the Broker (default: UTC) |
useApproximateCountDistinct | Whether to use an approximate cardinality algorithm for COUNT(DISTINCT foo) . | druid.sql.planner.useApproximateCountDistinct on the Broker (default: true) |
Error codes
MSQ errors are meant to be human readable and provide you with enough information
SQL syntax
MSQ has three primary SQL functions:
- EXTERN
- INSERT
- REPLACE
For information about using these and examples, see MSQ queries. For information about adjusting the shape of your data, see Adjust query behavior.
EXTERN
The EXTERN function is used to read external data and uses the following format:
SELECT
<column>
FROM TABLE(
EXTERN(
'<Druid input source>',
'<Druid input format>',
'<row signature>'
)
)
The EXTERN function uses three parameters:
- Any Druid input source as a JSON-encoded string.
- Any Druid input format as a JSON-encoded string.
- A row signature, as a JSON-encoded array of column descriptors. Each
column descriptor must have a
name
and atype
. The type can bestring
,long
,double
, orfloat
. This row signature is used to map the external data into the SQL layer.
INSERT
INSERT INTO <table name>
SELECT
<column>
FROM <table>
PARTITIONED BY <time frame>
An INSERT query consists of the following parts:
- Optional context parameters.
- An
INSERT INTO <dataSource>
clause at the start of your query, such asINSERT INTO w000
. - A clause for the data you want to insert, such as
SELECT...FROM TABLE...
. You can use EXTERN to reference an external tables using the following format: ``TABLE(EXTERN(...))`. - A PARTITIONED BY clause for your INSERT statement. For example, use
PARTITIONED BY DAY
for daily partitioning orPARTITIONED BY ALL TIME
to skip time partitioning completely. - An optional CLUSTERED BY clause.
REPLACE
REPLACE all data
REPLACE INTO <target table>
OVERWRITE ALL
SELECT
TIME_PARSE("timestamp") AS __time,
*
FROM <source table>
PARTITIONED BY <time>
REPLACE specific data
REPLACE INTO <target table>
OVERWRITE WHERE __time >= TIMESTAMP '<lower bound>' AND __time < TIMESTAMP '<upper bound>'
SELECT
TIME_PARSE("timestamp") AS __time,
*
FROM <source table>
PARTITIONED BY <time>
A REPLACE query consists of the following parts:
- Optional context parameters.
- A
REPLACE INTO <dataSource>
clause at the start of your query, such asREPLACE INTO w000
. - An OVERWRITE clause after the datasource, either OVERWRITE ALL or OVERWRITE WHERE ...:
OVERWRITE ALL replaces the entire existing datasource with the results of the query.
OVERWRITE WHERE drops the time segments that match the condition you set. Conditions are based on the
__time
column and use the format__time [< > = <= >=] TIMESTAMP
. Use them with AND, OR and NOT between them.BETWEEN TIMESTAMP AND TIMESTAMP
(inclusive of the timestamps specified). For an example, see REPLACE INTO ... OVERWRITE WHERE ... SELECT.
- A clause for the actual data you want to use for the replacement.
- A PARTITIONED BY clause to your REPLACE statement. For example, use PARTITIONED BY DAY for daily partitioning, or PARTITIONED BY ALL TIME to skip time partitioning completely.
- An optional CLUSTERED BY clause.