API
The Multi-Stage Query Engine is a preview feature available starting in Imply 2022.06. Preview features enable early adopters to benefit from new functionality while providing ongoing feedback to help shape and evolve the feature. All functionality documented on this page is subject to change or removal in future releases. Preview features are provided "as is" and are not subject to Imply SLAs.
Earlier versions of the Multi-Stage Query Engine used the
/druid/v2/sql/async/
end point. The engine now uses different endpoints in version 2022.05 and later. Some actions use the/druid/v2/sql/task
while others use the/druid/indexer/v1/task/
endpoint . Additionally, you no longer need to set a context parameter fortalaria
. API calls to thetask
endpoint use the Multi-Stage Query Engine automatically.
During the preview phase, the enhanced Query view will provide the most stable experience. Use the UI if you do not need a programmatic interface.
Queries for the Multi-Stage Query Engine (MSQE) run as tasks. The action you want to take determines the endpoint you use:
/druid/v2/sql/task
endpoint: Submit a query for ingestion./druid/indexer/v1/task
endpoint: Interact with a query, including getting its status, getting its details, or canceling it.
Submit a query
Request
Submit queries using the POST /druid/v2/sql/task/
API.
Currently, MSQE ignores the provided values of resultFormat
, header
,
typesHeader
, and sqlTypesHeader
. SQL SELECT queries always behave if resultFormat
is "array", header
is
true, typesHeader
is true, and sqlTypesHeader
is true.
For queries like the examples in the quickstart, you need to escape characters such as quotation marks (") if you use something like curl
. If you use a method that can parse JSON seamlessly like Python, you don't need to. The following example does though.
The following example is the same query that you submit when you complete Convert a JSON ingestion spec where you insert data into a table named wikipedia
. Make sure you replace username
, password
, your-instance
, and port
with the values for your deployment.
POST /druid/v2/sql/task
{
"query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://static.imply.io/gianm/wikipedia-2016-06-27-sampled.json\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY",
"context": {
"msqNumTasks": 3
}
}
curl --location --request POST 'https://<username>:<password>@<your-instance>:<port>/druid/v2/sql/task/' \
--header 'Content-Type: application/json' \
--data-raw '{
"query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '\''{\"type\": \"http\", \"uris\": [\"https://static.imply.io/gianm/wikipedia-2016-06-27-sampled.json\"]}'\'',\n '\''{\"type\": \"json\"}'\'',\n '\''[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\''\n )\n)\nPARTITIONED BY DAY",
"context": {
"msqNumTasks": 3
}
import json
import requests
url = "https://<username>:<password>@<your-instance>:<port>/druid/v2/sql/task/"
payload = json.dumps({
"query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://static.imply.io/gianm/wikipedia-2016-06-27-sampled.json\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY",
"context": {
"msqNumTasks": 3
}
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Response
{
"taskId": "query-f795a235-4dc7-4fef-abac-3ae3f9686b79",
"state": "RUNNING",
}
Response fields
Field | Description |
---|---|
taskId | Controller task ID. Druid's standard task APIs can be used to interact with this controller task. |
state | Initial state for the query, which is "RUNNING". |
Interact with a query
Because queries run as Overlord tasks, use the task APIs to interact with a query.
When using MSQE, the endpoints you frequently use may include:
GET /druid/indexer/v1/task/{taskId}
to get the query payloadGET /druid/indexer/v1/task/{taskId}/status
to get the query statusGET /druid/indexer/v1/task/{taskId}/reports
to get the query reportPOST /druid/indexer/v1/task/{taskId}/shutdown
to cancel a query
For information about the permissions required, see Security
MSQE reports
Keep the following in mind when using the task API to view reports:
- For SELECT queries, the results are included in the report. At this time, if you want to view results for SELECT queries, you need to retrieve them as a generic map from the report and extract the results.
- Query details are physically stored in the task report for controller tasks.
- If you encounter 500 Server Error or 404 Not Found errors, the task may be in the process of starting up or shutting down.
Context variables
In addition to the Druid SQL context parameters, Multi-Stage Query Engine (MSQE) supports context variables that are specific to it.
You provide context variables alongside your queries to customize the behavior of the query. The way you provide the variables depends on how you submit your query:
Druid console: Add your variables before your query like so:
--:context <key>: <value> --:context msqFinalizeAggregations: false INSERT INTO myTable ...
API: Add them in a
context
section of your JSON object like so:{ "query": "SELECT 1 + 1", "context": { "<key>": <value>, "msqNumTasks": 3 } }
Parameter | Description | Default value |
---|---|---|
msqNumTasks | (SELECT, INSERT, or REPLACE) The Multi-Stage Query Engine executes queries using the indexing service, i.e. using the Overlord + MiddleManager. This property specifies the total number of tasks to launch. The minimum possible value is 2, as at least one controller and one worker is necessary. All tasks must be able to launch simultaneously. If they cannot, the query will not launch and throw a TaskStartTimeout exception after about 10 minutes. | 2 |
msqFinalizeAggregations | (SELECT, INSERT, or REPLACE) Whether Druid will finalize the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see SQL aggregation functions. | true |
msqRowsInMemory | (INSERT or REPLACE) Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, you should stick to the default. It may be necessary to override this if you run into one of the current known issues around memory usage. | 100,000 |
msqSegmentSortOrder | (INSERT or REPLACE) Normally, Druid sorts rows in individual segments using __time first, then the CLUSTERED BY clause. When msqSegmentSortOrder is set, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.The column list can be provided as comma-separated values or as a JSON array in string form. If your query includes __time , then this list must begin with __time .For example: consider an INSERT query that uses CLUSTERED BY country and has msqSegmentSortOrder set to __time,city . Within each time chunk, Druid assigns rows to segments based on country , and then within each of those segments, Druid sorts those rows by __time first, then city , then country . | empty list |
maxParseExceptions | (SELECT, INSERT, or REPLACE) Maximum number of parse exceptions that are ignored while executing the query before it stops with TooManyWarningsFault . Can be set to -1 to ignore all the parse exceptions | 0 |
msqMode | (SELECT, INSERT, or REPLACE) Execute a query with a predefined set of parameters which are pretuned. It can be set to strict or nonStrict . Setting the mode to strict is equivalent to setting maxParseExceptions: 0 : the query fails if there is a malformed record. Setting the mode to nonStrict is equivalent to setting maxParseExceptions: -1 : the query won't fail regardless of the number of malformed records. . | no default value |
msqRowsPerSegment | (INSERT or REPLACE) Number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, you should stick to the default. For general information about sizing rows per segment, see Segment Size Optimization. | 3,000,000 |
msqDurableShuffleStorage | (SELECT, INSERT, or REPLACE) Whether to use durable storage for shuffle mesh. To use this feature, durable storage must be configured at the server level using druid.msq.intermediate.storage.enable=true and its associated properties. If these properties are not configured, any query with the context variable msqDurableShuffleStorage=true fails with a configuration error. | false |
Report response fields
The following table describes the response fields when you retrieve a report for a MSQE task using the /druid/indexer/v1/task
endpoint:
Field | Description |
---|---|
multiStageQuery.taskId | Controller task ID. |
multiStageQuery.payload.status | Query status container. |
multiStageQuery.payload.status.status | RUNNING, SUCCESS, or FAILED. |
multiStageQuery.payload.status.startTime | Start time of the query in ISO format. Only present if the query has started running. |
multiStageQuery.payload.status.durationMs | Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet. |
multiStageQuery.payload.status.errorReport | Error object. Only present if there was an error. |
multiStageQuery.payload.status.errorReport.taskId | The task that reported the error, if known. May be a controller task or a worker task. |
multiStageQuery.payload.status.errorReport.host | The hostname and port of the task that reported the error, if known. |
multiStageQuery.payload.status.errorReport.stageNumber | The stage number that reported the error, if it happened during execution of a specific stage. |
multiStageQuery.payload.status.errorReport.error | Error object. Contains errorCode at a minimum, and may contain other fields as described in the error code table. Always present if there is an error. |
multiStageQuery.payload.status.errorReport.error.errorCode | One of the error codes from the error code table. Always present if there is an error. |
multiStageQuery.payload.status.errorReport.error.errorMessage | User-friendly error message. Not always present, even if there is an error. |
multiStageQuery.payload.status.errorReport.exceptionStackTrace | Java stack trace in string form, if the error was due to a server-side exception. |
multiStageQuery.payload.stages | Array of query stages. |
multiStageQuery.payload.stages[].stageNumber | Each stage has a number that differentiates it from other stages. |
multiStageQuery.payload.stages[].inputStages | Array of input stage numbers. |
multiStageQuery.payload.stages[].stageType | String that describes the logic of this stage. This is not necessarily unique across stages. |
multiStageQuery.payload.stages[].phase | Either NEW, READING_INPUT, POST_READING, RESULTS_COMPLETE, or FAILED. Only present if the stage has started. |
multiStageQuery.payload.stages[].workerCount | Number of parallel tasks that this stage is running on. Only present if the stage has started. |
multiStageQuery.payload.stages[].partitionCount | Number of output partitions generated by this stage. Only present if the stage has started and has computed its number of output partitions. |
multiStageQuery.payload.stages[].inputFileCount | Number of external input files or Druid segments read by this stage. Does not include inputs from other stages in the same query. |
multiStageQuery.payload.stages[].startTime | Start time of this stage. Only present if the stage has started. |
multiStageQuery.payload.stages[].query | Native Druid query for this stage. Only present for the first stage that corresponds to a particular native Druid query. |
Error codes
The following table describes error codes you may encounter in the multiStageQuery.payload.status.errorReport.error.errorCode
field when using MSQE:
Code | Meaning | Additional fields |
---|---|---|
BroadcastTablesTooLarge | Size of the broadcast tables used in right hand side of the joins exceeded the memory reserved for them in a worker task | • maxBroadcastTablesSize: Memory reserved for the broadcast tables, measured in bytes |
Canceled | The query was canceled. Common reasons for cancellation:
| |
CannotParseExternalData | A worker task could not parse data from an external datasource. | |
DurableStorageConfiguration | Durable storage mode could not be activated due to a misconfiguration. Check durable storage for shuffle mesh for instructions on configuration. | |
ColumnTypeNotSupported | The query tried to use a column type that is not supported by the frame format. This currently occurs with ARRAY types, which are not yet implemented for frames. | • columnName • columnType |
InsertCannotAllocateSegment | The controller task could not allocate a new segment ID due to conflict with pre-existing segments or pending segments. Two common reasons for such conflicts:
| • dataSource • interval: interval for the attempted new segment allocation |
InsertCannotBeEmpty | An INSERT or REPLACE query did not generate any output rows, in a situation where output rows are required for success. Can happen for INSERT or REPLACE queries with PARTITIONED BY set to something other than ALL or ALL TIME . | • dataSource |
InsertCannotOrderByDescending | An INSERT query contained an CLUSTERED BY expression with descending order.Currently, Druid's segment generation code only supports ascending order. | • columnName |
InsertCannotReplaceExistingSegment | A REPLACE query or an INSERT query with sqlReplaceTimeChunks set cannot proceed because an existing segment partially overlaps those bounds and the portion within those bounds is not fully overshadowed by query results. There are two ways to address this without modifying your query:
| • segmentId: the existing segment |
InsertLockPreempted | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a realtime ingestion task. | |
InsertTimeNull | An INSERT or REPLACE query encountered a null timestamp in the __time field.This can happen due to using an expression like TIME_PARSE(timestamp) AS __time with an unparseable timestamp. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job. | |
InsertTimeOutOfBounds | A REPLACE query or an INSERT query with sqlReplaceTimeChunks generated a timestamp outside the bounds of the sqlReplaceTimeChunks parameter.To avoid this error, consider adding a WHERE filter to only select times from the chunks that you want to replace. | • interval: time chunk interval corresponding to the out-of-bounds timestamp |
QueryNotSupported | QueryKit could not translate the provided native query to a multi-stage query. This can happen if the query uses features that the multi-stage engine does not yet support, like GROUPING SETS. | |
RowTooLarge | The query tried to process a row that was too large to write to a single frame. See the Limits table for the specific limit on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | • maxFrameSize: the limit on frame size which was exceeded |
TaskStartTimeout | Unable to launch all the worker tasks in time. There might be insufficient available slots to start all the worker tasks simultaneously. Try splitting up the query into smaller chunks with lesser msqNumTasks number. Another option is to increase capacity. | |
TooManyBuckets | Too many partition buckets for a stage. Currently, partition buckets are only used for segmentGranularity during INSERT queries. The most common reason for this error is that your segmentGranularity is too narrow relative to the data. See the Limits table for the specific limit. | • maxBuckets: the limit on buckets which was exceeded |
TooManyInputFiles | Too many input files/segments per worker. See the Limits table for the specific limit. | • numInputFiles: the total number of input files/segments for the stage • maxInputFiles: the maximum number of input files/segments per worker per stage • minNumWorker: the minimum number of workers required for a sucessfull run |
TooManyPartitions | Too many partitions for a stage. The most common reason for this is that the final stage of an INSERT or REPLACE query generated too many segments. See the Limits table for the specific limit. | • maxPartitions: the limit on partitions which was exceeded |
TooManyColumns | Too many output columns for a stage. See the Limits table for the specific limit. | • maxColumns: the limit on columns which was exceeded |
TooManyWarnings | Too many warnings of a particular type generated. | • errorCode: The errorCode corresponding to the exception that exceeded the required limit. • maxWarnings: Maximum number of warnings that are allowed for the corresponding errorCode. |
TooManyWorkers | Too many workers running simultaneously. See the Limits table for the specific limit. | • workers: a number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage, if multiple stages are running simultaneously. • maxWorkers: the hard or soft limit on workers which was exceeded |
NotEnoughMemory | Not enough memory to launch a stage. | • serverMemory: the amount of memory available to a single process • serverWorkers: the number of workers running in a single process • serverThreads: the number of threads in a single process |
WorkerFailed | A worker task failed unexpectedly. | • workerTaskId: the id of the worker task |
WorkerRpcFailed | A remote procedure call to a worker task failed unrecoverably. | • workerTaskId: the id of the worker task |
UnknownError | All other errors. |