2022.06

2022.06

  • Imply
  • Ingest
  • Query
  • Visualize
  • Administer
  • Deploy

›Multi-stage query

Ingestion

  • Ingestion overview
  • Supported file formats
  • Data model
  • Data rollup
  • Partitioning
  • Ingestion spec
  • Schema design tips
  • Data management
  • Compaction
  • Automatic compaction
  • Troubleshooting FAQ

Multi-stage query

  • Overview
  • Setup
  • Quickstart
  • SQL syntax
  • Security
  • API
  • Advanced configs
  • Release notes

Stream ingestion

  • Apache Kafka ingestion
  • Apache Kafka supervisor
  • Apache Kafka operations
  • Amazon Kinesis
  • Tranquility
  • Realtime Process

Batch ingestion

  • Native batch
  • Simple task indexing
  • Input sources
  • Firehose
  • Hadoop-based
  • Load Hadoop data via Amazon EMR

Ingestion reference

  • Ingestion
  • Data formats
  • Task reference
  • Nested columns

API

The Multi-Stage Query Engine is a preview feature available starting in Imply 2022.06. Preview features enable early adopters to benefit from new functionality while providing ongoing feedback to help shape and evolve the feature. All functionality documented on this page is subject to change or removal in future releases. Preview features are provided "as is" and are not subject to Imply SLAs.

Earlier versions of the Multi-Stage Query Engine used the /druid/v2/sql/async/ end point. The engine now uses different endpoints in version 2022.05 and later. Some actions use the /druid/v2/sql/task while others use the /druid/indexer/v1/task/ endpoint . Additionally, you no longer need to set a context parameter for talaria. API calls to the task endpoint use the Multi-Stage Query Engine automatically.

During the preview phase, the enhanced Query view will provide the most stable experience. Use the UI if you do not need a programmatic interface.

Queries for the Multi-Stage Query Engine (MSQE) run as tasks. The action you want to take determines the endpoint you use:

  • /druid/v2/sql/task endpoint: Submit a query for ingestion.
  • /druid/indexer/v1/task endpoint: Interact with a query, including getting its status, getting its details, or canceling it.

Submit a query

Request

Submit queries using the POST /druid/v2/sql/task/ API.

Currently, MSQE ignores the provided values of resultFormat, header, typesHeader, and sqlTypesHeader. SQL SELECT queries always behave if resultFormat is "array", header is true, typesHeader is true, and sqlTypesHeader is true.

For queries like the examples in the quickstart, you need to escape characters such as quotation marks (") if you use something like curl. If you use a method that can parse JSON seamlessly like Python, you don't need to. The following example does though.

The following example is the same query that you submit when you complete Convert a JSON ingestion spec where you insert data into a table named wikipedia. Make sure you replace username, password, your-instance, and port with the values for your deployment.

HTTP
curl
Python
POST /druid/v2/sql/task
{
"query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://static.imply.io/gianm/wikipedia-2016-06-27-sampled.json\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY",
"context": {
"msqNumTasks": 3
}
}
curl --location --request POST 'https://<username>:<password>@<your-instance>:<port>/druid/v2/sql/task/' \
--header 'Content-Type: application/json' \
--data-raw '{
"query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '
\''{\"type\": \"http\", \"uris\": [\"https://static.imply.io/gianm/wikipedia-2016-06-27-sampled.json\"]}'\'',\n '\''{\"type\": \"json\"}'\'',\n '\''[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\''\n )\n)\nPARTITIONED BY DAY",
"context": {
"msqNumTasks": 3
}
import json
import requests

url = "https://<username>:<password>@<your-instance>:<port>/druid/v2/sql/task/"

payload = json.dumps({
"query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://static.imply.io/gianm/wikipedia-2016-06-27-sampled.json\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY",
"context": {
"msqNumTasks": 3
}
})
headers = {
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Response

{
  "taskId": "query-f795a235-4dc7-4fef-abac-3ae3f9686b79",
  "state": "RUNNING",
}

Response fields

FieldDescription
taskIdController task ID.

Druid's standard task APIs can be used to interact with this controller task.
stateInitial state for the query, which is "RUNNING".

Interact with a query

Because queries run as Overlord tasks, use the task APIs to interact with a query.

When using MSQE, the endpoints you frequently use may include:

  • GET /druid/indexer/v1/task/{taskId} to get the query payload
  • GET /druid/indexer/v1/task/{taskId}/status to get the query status
  • GET /druid/indexer/v1/task/{taskId}/reports to get the query report
  • POST /druid/indexer/v1/task/{taskId}/shutdown to cancel a query

For information about the permissions required, see Security

MSQE reports

Keep the following in mind when using the task API to view reports:

  • For SELECT queries, the results are included in the report. At this time, if you want to view results for SELECT queries, you need to retrieve them as a generic map from the report and extract the results.
  • Query details are physically stored in the task report for controller tasks.
  • If you encounter 500 Server Error or 404 Not Found errors, the task may be in the process of starting up or shutting down.

Context variables

In addition to the Druid SQL context parameters, Multi-Stage Query Engine (MSQE) supports context variables that are specific to it.

You provide context variables alongside your queries to customize the behavior of the query. The way you provide the variables depends on how you submit your query:

  • Druid console: Add your variables before your query like so:

    --:context <key>: <value> 
    --:context msqFinalizeAggregations: false
    INSERT INTO myTable
    ...
    
  • API: Add them in a context section of your JSON object like so:

    {
     "query": "SELECT 1 + 1",
     "context": {
         "<key>": <value>,
         "msqNumTasks": 3
     }
    }
    
ParameterDescriptionDefault value
msqNumTasks(SELECT, INSERT, or REPLACE)

The Multi-Stage Query Engine executes queries using the indexing service, i.e. using the Overlord + MiddleManager. This property specifies the total number of tasks to launch.

The minimum possible value is 2, as at least one controller and one worker is necessary.

All tasks must be able to launch simultaneously. If they cannot, the query will not launch and throw a TaskStartTimeout exception after about 10 minutes.
2
msqFinalizeAggregations(SELECT, INSERT, or REPLACE)

Whether Druid will finalize the results of complex aggregations that directly appear in query results.

If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see SQL aggregation functions.
true
msqRowsInMemory(INSERT or REPLACE)

Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries.

In most cases, you should stick to the default. It may be necessary to override this if you run into one of the current known issues around memory usage.
100,000
msqSegmentSortOrder(INSERT or REPLACE)

Normally, Druid sorts rows in individual segments using __time first, then the CLUSTERED BY clause. When msqSegmentSortOrder is set, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.

The column list can be provided as comma-separated values or as a JSON array in string form. If your query includes __time, then this list must begin with __time.

For example: consider an INSERT query that uses CLUSTERED BY country and has msqSegmentSortOrder set to __time,city. Within each time chunk, Druid assigns rows to segments based on country, and then within each of those segments, Druid sorts those rows by __time first, then city, then country.
empty list
maxParseExceptions(SELECT, INSERT, or REPLACE)

Maximum number of parse exceptions that are ignored while executing the query before it stops with TooManyWarningsFault. Can be set to -1 to ignore all the parse exceptions

0
msqMode(SELECT, INSERT, or REPLACE)

Execute a query with a predefined set of parameters which are pretuned. It can be set to strict or nonStrict. Setting the mode to strict is equivalent to setting maxParseExceptions: 0: the query fails if there is a malformed record. Setting the mode to nonStrict is equivalent to setting maxParseExceptions: -1: the query won't fail regardless of the number of malformed records. .

no default value
msqRowsPerSegment(INSERT or REPLACE)

Number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, you should stick to the default. For general information about sizing rows per segment, see Segment Size Optimization.
3,000,000
msqDurableShuffleStorage(SELECT, INSERT, or REPLACE)

Whether to use durable storage for shuffle mesh. To use this feature, durable storage must be configured at the server level using druid.msq.intermediate.storage.enable=true and its associated properties. If these properties are not configured, any query with the context variable msqDurableShuffleStorage=true fails with a configuration error.

false

Report response fields

The following table describes the response fields when you retrieve a report for a MSQE task using the /druid/indexer/v1/task endpoint:

FieldDescription
multiStageQuery.taskIdController task ID.
multiStageQuery.payload.statusQuery status container.
multiStageQuery.payload.status.statusRUNNING, SUCCESS, or FAILED.
multiStageQuery.payload.status.startTimeStart time of the query in ISO format. Only present if the query has started running.
multiStageQuery.payload.status.durationMsMilliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet.
multiStageQuery.payload.status.errorReportError object. Only present if there was an error.
multiStageQuery.payload.status.errorReport.taskIdThe task that reported the error, if known. May be a controller task or a worker task.
multiStageQuery.payload.status.errorReport.hostThe hostname and port of the task that reported the error, if known.
multiStageQuery.payload.status.errorReport.stageNumberThe stage number that reported the error, if it happened during execution of a specific stage.
multiStageQuery.payload.status.errorReport.errorError object. Contains errorCode at a minimum, and may contain other fields as described in the error code table. Always present if there is an error.
multiStageQuery.payload.status.errorReport.error.errorCodeOne of the error codes from the error code table. Always present if there is an error.
multiStageQuery.payload.status.errorReport.error.errorMessageUser-friendly error message. Not always present, even if there is an error.
multiStageQuery.payload.status.errorReport.exceptionStackTraceJava stack trace in string form, if the error was due to a server-side exception.
multiStageQuery.payload.stagesArray of query stages.
multiStageQuery.payload.stages[].stageNumberEach stage has a number that differentiates it from other stages.
multiStageQuery.payload.stages[].inputStagesArray of input stage numbers.
multiStageQuery.payload.stages[].stageTypeString that describes the logic of this stage. This is not necessarily unique across stages.
multiStageQuery.payload.stages[].phaseEither NEW, READING_INPUT, POST_READING, RESULTS_COMPLETE, or FAILED. Only present if the stage has started.
multiStageQuery.payload.stages[].workerCountNumber of parallel tasks that this stage is running on. Only present if the stage has started.
multiStageQuery.payload.stages[].partitionCountNumber of output partitions generated by this stage. Only present if the stage has started and has computed its number of output partitions.
multiStageQuery.payload.stages[].inputFileCountNumber of external input files or Druid segments read by this stage. Does not include inputs from other stages in the same query.
multiStageQuery.payload.stages[].startTimeStart time of this stage. Only present if the stage has started.
multiStageQuery.payload.stages[].queryNative Druid query for this stage. Only present for the first stage that corresponds to a particular native Druid query.

Error codes

The following table describes error codes you may encounter in the multiStageQuery.payload.status.errorReport.error.errorCode field when using MSQE:

CodeMeaningAdditional fields
BroadcastTablesTooLargeSize of the broadcast tables used in right hand side of the joins exceeded the memory reserved for them in a worker task• maxBroadcastTablesSize: Memory reserved for the broadcast tables, measured in bytes
CanceledThe query was canceled. Common reasons for cancellation:

  • User-initiated shutdown of the controller task via the /druid/indexer/v1/task/{taskId}/shutdown API.
  • Restart or failure of the server process that was running the controller task.
CannotParseExternalDataA worker task could not parse data from an external datasource.
DurableStorageConfigurationDurable storage mode could not be activated due to a misconfiguration.

Check durable storage for shuffle mesh for instructions on configuration.
ColumnTypeNotSupportedThe query tried to use a column type that is not supported by the frame format.

This currently occurs with ARRAY types, which are not yet implemented for frames.
• columnName

• columnType
InsertCannotAllocateSegmentThe controller task could not allocate a new segment ID due to conflict with pre-existing segments or pending segments. Two common reasons for such conflicts:

  • Attempting to mix different granularities in the same intervals of the same datasource.
  • Prior ingestions that used non-extendable shard specs.
• dataSource

• interval: interval for the attempted new segment allocation
InsertCannotBeEmptyAn INSERT or REPLACE query did not generate any output rows, in a situation where output rows are required for success.

Can happen for INSERT or REPLACE queries with PARTITIONED BY set to something other than ALL or ALL TIME.
• dataSource
InsertCannotOrderByDescendingAn INSERT query contained an CLUSTERED BY expression with descending order.

Currently, Druid's segment generation code only supports ascending order.
• columnName
InsertCannotReplaceExistingSegmentA REPLACE query or an INSERT query with sqlReplaceTimeChunks set cannot proceed because an existing segment partially overlaps those bounds and the portion within those bounds is not fully overshadowed by query results.

There are two ways to address this without modifying your query:
  • Shrink sqlReplaceTimeChunks to match the query results.
  • Expand sqlReplaceTimeChunks to fully contain the existing segment.
• segmentId: the existing segment
InsertLockPreemptedAn INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a realtime ingestion task.
InsertTimeNullAn INSERT or REPLACE query encountered a null timestamp in the __time field.

This can happen due to using an expression like TIME_PARSE(timestamp) AS __time with an unparseable timestamp. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.

If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job.
InsertTimeOutOfBoundsA REPLACE query or an INSERT query with sqlReplaceTimeChunks generated a timestamp outside the bounds of the sqlReplaceTimeChunks parameter.

To avoid this error, consider adding a WHERE filter to only select times from the chunks that you want to replace.
• interval: time chunk interval corresponding to the out-of-bounds timestamp
QueryNotSupportedQueryKit could not translate the provided native query to a multi-stage query.

This can happen if the query uses features that the multi-stage engine does not yet support, like GROUPING SETS.
RowTooLargeThe query tried to process a row that was too large to write to a single frame.

See the Limits table for the specific limit on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing.
• maxFrameSize: the limit on frame size which was exceeded
TaskStartTimeoutUnable to launch all the worker tasks in time.

There might be insufficient available slots to start all the worker tasks simultaneously.

Try splitting up the query into smaller chunks with lesser msqNumTasks number. Another option is to increase capacity.
TooManyBucketsToo many partition buckets for a stage.

Currently, partition buckets are only used for segmentGranularity during INSERT queries. The most common reason for this error is that your segmentGranularity is too narrow relative to the data. See the Limits table for the specific limit.
• maxBuckets: the limit on buckets which was exceeded
TooManyInputFilesToo many input files/segments per worker.

See the Limits table for the specific limit.
• numInputFiles: the total number of input files/segments for the stage

• maxInputFiles: the maximum number of input files/segments per worker per stage

• minNumWorker: the minimum number of workers required for a sucessfull run
TooManyPartitionsToo many partitions for a stage.

The most common reason for this is that the final stage of an INSERT or REPLACE query generated too many segments. See the Limits table for the specific limit.
• maxPartitions: the limit on partitions which was exceeded
TooManyColumnsToo many output columns for a stage.

See the Limits table for the specific limit.
• maxColumns: the limit on columns which was exceeded
TooManyWarningsToo many warnings of a particular type generated.• errorCode: The errorCode corresponding to the exception that exceeded the required limit.

• maxWarnings: Maximum number of warnings that are allowed for the corresponding errorCode.
TooManyWorkersToo many workers running simultaneously.

See the Limits table for the specific limit.
• workers: a number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage, if multiple stages are running simultaneously.

• maxWorkers: the hard or soft limit on workers which was exceeded
NotEnoughMemoryNot enough memory to launch a stage.• serverMemory: the amount of memory available to a single process

• serverWorkers: the number of workers running in a single process

• serverThreads: the number of threads in a single process
WorkerFailedA worker task failed unexpectedly.• workerTaskId: the id of the worker task
WorkerRpcFailedA remote procedure call to a worker task failed unrecoverably.• workerTaskId: the id of the worker task
UnknownErrorAll other errors.
Last updated on 6/29/2022
← SecurityAdvanced configs →
  • Submit a query
    • Request
    • Response
  • Interact with a query
    • MSQE reports
  • Context variables
  • Report response fields
  • Error codes
2022.06
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
Apache Druid forumsBlog
Copyright © 2022 Imply Data, Inc