The Multi-Stage Query (MSQ) Framework is a preview feature available starting in Imply 2022.06. Preview features enable early adopters to benefit from new functionality while providing ongoing feedback to help shape and evolve the feature. All functionality documented on this page is subject to change or removal in future releases. Preview features are provided "as is" and are not subject to Imply SLAs.
- There are two new context parameters:
msqNumTasks. This property specifies the maximum number of tasks to launch, including the controller task . When both are present,
msqMaxNumTaskstakes precedence. The default value is
2, which is the lowest supported number of tasks: one controller and one worker.
msqNumTaskswill be removed in a future release. You can set this proprety in the console with the Max tasks option or include it as context parameter.
msqTaskAssignmentdetermines how the number of tasks is chosen. You can set it to either
maxwhere MSQ uses as many tasks as possible (up to the limit set by
autowhere MSQ uses as few tasks as possible without exceeding 10 Gib or 10,000 files per task.
- The maximum number of tasks control is now more prominently displayed near the Run button in the web console.
- MSQ now shows the following errors:
InvalidNullByteerror if a string column includes a null byte, which is not allowed.
InsertTimeNullerror if an INSERT or REPLACE query contains a null timestamp.
- Improved how the web console detects MSQ. The sql-task engine for MSQ is only available if the MSQ extension is configured correctly.
- The Connect external data wizard now auto detects columns that are suitable to be parsed as the primary timestamp column.
- The web console can now recover from network errors / connectivity issues while running a query.
- You can now open a query detail archive generated in another cluster.
- Improved the counters. There is now one counter per input. The counters now contain more detail, are sorted, and are rendered as a table.
- Improved reporting for task cancellations and worker errors.
- Fixed the following bugs:
- Moving from one tab to another canceled a
sqlquery if one was already running.
- Counters reset to 0 under certain crash conditions.
- Certain errors were being reported incorrectly (for example invalid "Access key ID" in an S3 input source).
- The shuffle mesh used an excessive number of threads. Communication now happens in chunks, so you no longer need to set.
druid.server.http.numThreadsto a very high number.
- While using durable storage, if a worker gets killed due to any reason, the controller will still clean up the stage outputs of all the workers as a fail-safe.
- Moving from one tab to another canceled a
- The location of some of the documentation has changed. You may need to update your bookmarks. Additionally, the Multi-Stage Query Engine (MSQE) is now referred to as the Multi-Stage Query (MSQ) Framework. You may still see some references to MSQE.
Changes and improvements
- Property names have changed. Previously, properties used the
talariaprefix. They now either use
multiStageQuerydepending on the context. Note that the extension is still named
- You no longer need a license that has an explicit entitlement for MSQE.
- MSQE now supports S3 as a storage medium when using mesh shuffles. For more information, see Durable storage for mesh shuffles.
- Reports for MSQE queries now include warning sections that provide information about the type of warning and number of occurrences.
- You can now use REPLACE in MSQE queries. For more information, see REPLACE.
- The default mode for MSQE is now
strict. This means that a query fails if there is a single malformed record. You can change this behavior using the context variable
- The behavior of
talariaNumTasks) has changed. It now accurately reflects the total number of slots required, including the controller. Previously, the controller was not accounted for.
- The EXTERN operator now requires READ access to the resource type EXTERNAL that is named EXTERNAL. This new permission must be added manually to API users who issue queries that access external data. In Imply Enterprise Hybrid and in Imply Enterprise, this permission is added automatically to users with the ManageDatasets permission. For Imply Enterprise, you must update Imply Manager to 2022.06 prior to updating your Imply version to 2022.06.
- Queries that carry large amounts of data through multiple stages are sped up significantly due to performance enhancements in sorting and shuffling.
- Queries that use GROUP BY followed by ORDER BY, PARTITIONED BY, or CLUSTERED BY execute with one less stage, improving performance.
- Strings are no longer permitted to contain null bytes. Strings containing null bytes will result in an InvalidNullByte error.
- INSERT or REPLACE queries with null values for
__timenow exit with an InsertTimeNull error.
- INSERT or REPLACE queries that are aborted due to lock preemption now exit with an InsertLockPreempted error. Previously, they would exit with an UnknownError.
- MSQE generated segments can now be used with Pivot datacubes and support auto-compaction if the query conforms to the conditions defined in GROUP BY.
- You an now download the results of a query as a CSV, TSV, JSON file. Click the download icon in the Query view after you run a query: .
- Fixed an issue where the controller task would stall indefinitely when its worker tasks could not all start up. Now, the controller task exits with a TaskStartTimeout after ten minutes.
- Fixed an issue where the controller task would exit with an UnknownError when canceled. Now, the controller task correctly exits with a Canceled error.
- Fixed an issue where error message when
groupByEnableMultiValueUnnesting: falseis set in the context of a query that uses GROUP BY referred to the parameter
executingNestedQuery. It now uses the correct parameter
- Fixed an issue where the true error would sometimes be shadowed by a WorkerFailed error. (20466)
- Fixed an issue where tasks would not retry certain retryable Overlord API errors. Now, these errors are retried, improving reliability.
- You no longer need to load the
imply-sql-asyncextension to use the Multi-Stage Query Engine. You only need to load the
- The API endpoints have changed. Earlier versions of the Multi-Stage Query Engine used the
/druid/v2/sql/async/endpoint. The engine now uses different endpoints based on what you're trying to do:
/druid/indexer/v1/task/. For more information, see API.
- You no longer need to set a context parameter for
talariawhen making API calls. API calls to the
taskendpoint use the Multi-Stage Query Engine automatically.
- Fixed an issue that caused an
IndexOutOfBoundsExceptionerror to occur, which led to some ingestion jobs failing.
- Stage outputs are now removed from local disk when no longer needed. This reduces the total amount of local disk space required by jobs with more than two stages. (15030)
- It is now possible to control segment sort order independently of CLUSTERED BY, using the
talariaSegmentSortOrdercontext parameter. (18320)
- There is now a guardrail on the maximum number of input files. Exceeded this limit leads to a TooManyInputFiles) error. (15020)
- Queries now report the error code WorkerRpcFailed) when controller-to-worker or worker-to-worker communication fails. Previously, this would be reported as an UnknownError. (18971)
- Fixed an issue where queries with large numbers of partitions could run out of memory. (19162)
- Fixed an issue where worker errors were sometimes hidden by controller errors that cascaded from those worker failures. Now, the original worker error is preferred. (18971)
- Fixed an issue where worker errors were incompletely logged in worker task logs. (18964)
- Fixed an issue where workers could sometimes fail soon after startup with a
- Fixed an issue where worker tasks sometimes continued running after controller failures. (18052)
- The web console now includes counters for external input rows and files, Druid table input rows and segments, and sort progress. As part of this change, the query detail response format has changed. Clients performing programmatic access will need to be updated. (15048, 15208, 18070)
- Added the ability to avoid unnesting multi-value string dimensions during GROUP BY. This is useful for performing ingestion with rollup. (15031, 16875, 16887)
- EXPLAIN PLAN FOR now works properly on INSERT queries. (17321)
- External input files are now read in parallel when running in Indexers. (17933)
- Improved accuracy of partition-determination. Segments generated by INSERT are now more regularly sized. (17867)
- CannotParseExternalData) error reports now include input file path and line number information. (16016)
- There is now an upper limit on the number of workers, partially determined by available memory. Exceeding this limit leads to a TooManyWorkers) error. (15021)
- There is now a guardrail on the maximum size of data involved in a broadcast join. Queries that exceed the limit will report a BroadcastTablesTooLarge) error code. (15024)
- When a worker fails abruptly, the controller now reports a WorkerTaskFailed) error code instead of UnknownError. (15024)
- Controllers will no longer give up on workers before the Overlord does. Previously, the controller would fail with the message "Connection refused" if workers took longer than 30 seconds to start up. (17602)
- Fixed an issue where INSERT queries that generate large numbers of time chunks may fail with a message containing "SketchesArgumentException: K must be >= 2 and <= 32768 and a power of 2". This happened when the number of generated time chunks was close to the TooManyBuckets limit. (14764)
- Fixed an issue where queries with certain input sources would report an error with the message "Too many workers" when there were more files than workers. (18022)
- Fixed an issue where SELECT queries with LIMIT would sometimes return more rows than intended. (17394)
- Fixed an issue where workers could intermittently fail with an UnknownError with the message "Invalid midstream marker". (17602)
- Fixed an issue where workers could run out of memory when connecting to large numbers of other workers. (16153)
- Fixed an issue where workers could run out of memory during GROUP BY of large external input files. (17781)
- Fixed an issue where workers could retry reading the same external input file repeatedly and never succeed. (17936, 18009)
- INSERT uses PARTITIONED BY and CLUSTERED BY instead of talariaSegmentGranularity and ORDER BY. (15045)
- INSERT validates the datasource name at planning time instead of execution time. (15038)
- SELECT queries support OFFSET. (15000)
- The "Connect external data" feature in the Query view of the web console correctly supports Parquet files. Previously, this feature would report a "ParseException: Incorrect Regex" error on Parquet files. (16197)
- The Query detail API includes startTime and durationMs for the whole query. (15046)
- Multi-stage queries can now be issued using the async query API provided by the
talariaFinalizeAggregationsparameter may be set to false to cause queries to emit nonfinalized aggregation results. (15010)
- INSERT queries obtain minimally-sized locks rather than locking the entire target datasource. (15003)
- The Query view of the web console now has tabs and an engine selector that allows issuing multi-stage queries. The dedicated "Talaria" view has been removed.
- The web console includes an ingestion spec conversion tool. It performs a best-effort conversion of a native batch ingestion spec into a SQL query. It does not guarantee perfect fidelity, so we recommend that you review the generated SQL query before running it.
- INSERT queries with LIMIT and
talariaSegmentGranularityset to "all" now execute properly and write a single segment to the target datasource. Previously, these queries would fail. (15051)
- INSERT queries using
sqlReplaceTimeChunksnow always produce "range" shard specs when appropriate. Previously, "numbered" shard specs would sometimes be produced instead of "range" shard specs as documented. (14768)
- Initial release.
General query execution
MSQE does not have fault tolerance. If any task fails, the entire query fails.
In case of a worker crash, stage outputs on S3 are not deleted automatically. You may need to delete the file using an external process or create an S3 lifecycle policy to remove the objects under
druid.msq.intermediate.storage.prefix. A good start would be to delete the objects after 3 days if they are not automatically deleted. (21917)
Only one local filesystem per server is used for stage output data during multi-stage query execution. If your servers have multiple local filesystems, this causes queries to exhaust available disk space earlier than expected. As a workaround, you can use durable storage for shuffle meshes. (16181)
talariaNumTasks) is higher than the total capacity of the cluster, more tasks may be launched than can run at once. This leads to a TaskStartTimeout error code, as there is never enough capacity to run the query. To avoid this, set
msqMaxNumTasksto a number of tasks that can run simultaneously on your cluster. (23242)
msqTaskAssignmentis set to
auto, the system generates one task per input file for certain splittable input sources where file sizes are not known ahead of time. This includes the
httpinput source, where the system generates one task per URI. (23163)
INSERT queries can consume excessive memory when using complex types due to inaccurate footprint estimation. This can appear as an OutOfMemoryError during the SegmentGenerator stage when using sketches. If you run into this issue, try manually lowering the value of the
EXTERN loads an entire row group into memory at once when reading from Parquet files. Row groups can be up to 1 GB in size, which can lead to excessive heap usage when reading many files in parallel. This can appear as an OutOfMemoryError during stages that read Parquet input files. If you run into this issue, try using a smaller number of worker tasks or you can increase the heap size of your Indexers or of your Middle Manager-launched indexing tasks. (17932)
- SELECT query results do not include real-time data until it has been published. (18092)
TIMESTAMP types are formatted as numbers rather than ISO8601 timestamp strings, which differs from Druid's standard result format. (14995)
BOOLEAN types are formatted as numbers like
false, which differs from Druid's standard result format. (14995)
TopN is not implemented. The context parameter
useApproximateTopNis ignored and always treated as if it were
false. Therefore, topN-shaped queries will always run using the groupBy engine. There is no loss of functionality, but there may be a performance impact, since these queries will run using an exact algorithm instead of an approximate one. (14998)
GROUPING SETS is not implemented. Queries that use GROUPING SETS will fail. (14999)
The numeric flavors of the EARLIEST and LATEST aggregators do not work properly. Attempting to use the numeric flavors of these aggregators will lead to an error like
java.lang.ClassCastException: class java.lang.Double cannot be cast to class org.apache.druid.collections.SerializablePair. The string flavors, however, do work properly. (15040)
The schemaless dimensions feature is not available. All columns and their types must be specified explicitly. (15004)
Segment metadata queries on datasources ingested with the Multi-Stage Query Engine will return values for
timestampSpecthat are not usable for introspection. (15007)
aggregatorFactoriesis on a best effort basis. In particular, Pivot will not be able to automatically create data cubes that properly reflect the rollup configurations if the insert query does not meet the conditions defined in Rollup. Proper data cubes can still be created manually. (20879)
When INSERT with GROUP BY does the match the criteria mentioned in GROUP BY, the multi-stage engine generates segments that Druid's compaction functionality is not able to further roll up. This applies to autocompaction as well as manually issued
compacttasks. Individual queries executed with the multi-stage engine always guarantee perfect rollup for their output, so this only matters if you are performing a sequence of INSERT queries that each append data to the same time chunk. If necessary, you can compact such data using another SQL query instead of a
When using INSERT with GROUP BY, splitting of large partitions is not currently implemented. If a single partition key appears in a very large number of rows, an oversized segment will be created. You can mitigate this by adding additional columns to your partition key. Note that partition splitting does work properly when performing INSERT without GROUP BY. (15015)
INSERT with column lists, like
INSERT INTO tbl (a, b, c) SELECT ..., is not implemented. (15009)
- EXTERN does not accept
druidinput sources. (15018)
- Maximum amount of local disk space to use for temporary data when durable storage is turned off. No guardrail today means worker tasks may exhaust all available disk space. In this case, you will receive an UnknownError) with a message including "No space left on device". (19204)