2022.07

2022.07

  • Imply
  • Ingest
  • Query
  • Visualize
  • Administer
  • Deploy

›Multi-stage query

Ingestion

  • Ingestion overview
  • Supported file formats
  • Data model
  • Data rollup
  • Partitioning
  • Ingestion spec
  • Schema design tips
  • Data management
  • Compaction
  • Automatic compaction
  • Troubleshooting FAQ

Multi-stage query

  • Overview
  • Concepts
  • Setup
  • UI walkthrough
  • Tutorial - Connect external data
  • Tutorial - Convert ingestion spec
  • Queries
  • API
  • Security
  • Advanced configs
  • Reference
  • Release notes

Stream ingestion

  • Apache Kafka ingestion
  • Apache Kafka supervisor
  • Apache Kafka operations
  • Amazon Kinesis
  • Tranquility
  • Realtime Process

Batch ingestion

  • Native batch
  • Simple task indexing
  • Input sources
  • Firehose
  • Hadoop-based
  • Load Hadoop data via Amazon EMR

Ingestion reference

  • Ingestion
  • Data formats
  • Task reference
  • Nested columns

Reference

The Multi-Stage Query (MSQ) Framework is a preview feature available starting in Imply 2022.06. Preview features enable early adopters to benefit from new functionality while providing ongoing feedback to help shape and evolve the feature. All functionality documented on this page is subject to change or removal in future releases. Preview features are provided "as is" and are not subject to Imply SLAs.

Context parameters

In addition to the Druid SQL context parameters, Multi-Stage Query (MSQ) supports context parameters that are specific to it.

You provide context parameters alongside your queries to customize the behavior of the query. The way you provide the parameters depends on how you submit your query:

  • Druid console: Add your parameters before your query like so:

    --:context <key>: <value> 
    --:context msqFinalizeAggregations: false
    INSERT INTO myTable
    ...
    
  • API: Add them in a context section of your JSON object like so:

    {
     "query": "SELECT 1 + 1",
     "context": {
         "<key>": <value>,
         "msqMaxNumTasks": 3
     }
    }
    
ParameterDescriptionDefault value
msqMaxNumTasks(SELECT, INSERT, or REPLACE)

MSQ executes queries using the indexing service, that is using the Overlord + MiddleManager. This property specifies the maximum total number of tasks to launch, including the controller task.

The lowest possible value for this setting is 2: one controller and one worker.

All tasks must be able to launch simultaneously. If they cannot, the query will return a TaskStartTimeout error code after about 10 minutes.

May also be provided as msqNumTasks. If both are present, msqMaxNumTasks takes priority.
2
msqTaskAssignment(SELECT, INSERT, or REPLACE)

Determines how the number of tasks is chosen.

max: use as many tasks as possible, up to the maximum msqMaxNumTasks.

auto: use as few tasks as possible without exceeding 10 GiB or 10,000 files per task. Please review the limitations of auto mode before using it.
max
msqFinalizeAggregations(SELECT, INSERT, or REPLACE)

Whether Druid will finalize the results of complex aggregations that directly appear in query results.

If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see SQL aggregation functions.
true
msqRowsInMemory(INSERT or REPLACE)

Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries.

In most cases, you should stick to the default. It may be necessary to override this if you run into one of the current known issues around memory usage.
100,000
msqSegmentSortOrder(INSERT or REPLACE)

Normally, Druid sorts rows in individual segments using __time first, then the CLUSTERED BY clause. When msqSegmentSortOrder is set, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.

The column list can be provided as comma-separated values or as a JSON array in string form. If your query includes __time, then this list must begin with __time.

For example: consider an INSERT query that uses CLUSTERED BY country and has msqSegmentSortOrder set to __time,city. Within each time chunk, Druid assigns rows to segments based on country, and then within each of those segments, Druid sorts those rows by __time first, then city, then country.
empty list
maxParseExceptions(SELECT, INSERT, or REPLACE)

Maximum number of parse exceptions that are ignored while executing the query before it stops with TooManyWarningsFault. To ignore all the parse exceptions, set the value to -1.

0
msqRowsPerSegment(INSERT or REPLACE)

Number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, you should stick to the default. For general information about sizing rows per segment, see Segment Size Optimization.
3,000,000
msqDurableShuffleStorage(SELECT, INSERT, or REPLACE)

Whether to use durable storage for shuffle mesh. To use this feature, durable storage must be configured at the server level using druid.msq.intermediate.storage.enable=true and its associated properties. If these properties are not configured, any query with the context variable msqDurableShuffleStorage=true fails with a configuration error.

false
sqlTimeZoneSets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".druid.sql.planner.sqlTimeZone on the Broker (default: UTC)
useApproximateCountDistinctWhether to use an approximate cardinality algorithm for COUNT(DISTINCT foo).druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)

Error codes

MSQ errors are meant to be human readable and provide you with enough information

SQL syntax

MSQ has three primary SQL functions:

  • EXTERN
  • INSERT
  • REPLACE

For information about using these and examples, see MSQ queries. For information about adjusting the shape of your data, see Adjust query behavior.

EXTERN

The EXTERN function is used to read external data and uses the following format:

SELECT
 <column>
FROM TABLE(
  EXTERN(
    '<Druid input source>',
    '<Druid input format>',
    '<row signature>'
  )
)

The EXTERN function uses three parameters:

  1. Any Druid input source as a JSON-encoded string.
  2. Any Druid input format as a JSON-encoded string.
  3. A row signature, as a JSON-encoded array of column descriptors. Each column descriptor must have a name and a type. The type can be string, long, double, or float. This row signature is used to map the external data into the SQL layer.

INSERT

INSERT INTO <table name>
SELECT
  <column>
FROM <table>
PARTITIONED BY <time frame>

An INSERT query consists of the following parts:

  1. Optional context parameters.
  2. An INSERT INTO <dataSource> clause at the start of your query, such as INSERT INTO w000.
  3. A clause for the data you want to insert, such asSELECT...FROM TABLE.... You can use EXTERN to reference an external tables using the following format: ``TABLE(EXTERN(...))`.
  4. A PARTITIONED BY clause for your INSERT statement. For example, use PARTITIONED BY DAY for daily partitioning or PARTITIONED BY ALL TIME to skip time partitioning completely.
  5. An optional CLUSTERED BY clause.

REPLACE

REPLACE all data

REPLACE INTO <target table>
OVERWRITE ALL
SELECT
  TIME_PARSE("timestamp") AS __time,
  *
FROM <source table>

PARTITIONED BY <time>

REPLACE specific data

REPLACE INTO <target table>
OVERWRITE WHERE __time >= TIMESTAMP '<lower bound>' AND __time < TIMESTAMP '<upper bound>'
SELECT
  TIME_PARSE("timestamp") AS __time,
  *
FROM <source table>

PARTITIONED BY <time>

A REPLACE query consists of the following parts:

  1. Optional context parameters.
  2. A REPLACE INTO <dataSource> clause at the start of your query, such as REPLACE INTO w000.
  3. An OVERWRITE clause after the datasource, either OVERWRITE ALL or OVERWRITE WHERE ...:
  • OVERWRITE ALL replaces the entire existing datasource with the results of the query.

  • OVERWRITE WHERE drops the time segments that match the condition you set. Conditions are based on the __time column and use the format __time [< > = <= >=] TIMESTAMP. Use them with AND, OR and NOT between them. BETWEEN TIMESTAMP AND TIMESTAMP (inclusive of the timestamps specified). For an example, see REPLACE INTO ... OVERWRITE WHERE ... SELECT.

  1. A clause for the actual data you want to use for the replacement.
  2. A PARTITIONED BY clause to your REPLACE statement. For example, use PARTITIONED BY DAY for daily partitioning, or PARTITIONED BY ALL TIME to skip time partitioning completely.
  3. An optional CLUSTERED BY clause.
Last updated on 7/21/2022
← Advanced configsRelease notes →
  • Context parameters
  • Error codes
  • SQL syntax
    • EXTERN
    • INSERT
    • REPLACE
2022.07
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
Apache Druid forumsBlog
Copyright © 2022 Imply Data, Inc