Asynchronous SQL download (deprecated)
Asynchronous download is deprecated.
Apache Druid supports two query languages: Druid SQL and native queries. This document describes the SQL language.
Asynchronous query download (async download) lets you run longer-executing queries and retrieve the results after the queries complete. It solves problems caused when timeouts cause interruptions in the connection between query clients and the Druid cluster.
Async download does not:
- Provide file management APIs. Druid is not a file management system, therefore Druid does not expose the concept of the file containing the results to users.
- Support long retention periods for the query results. You should write your query to fetch the query result as soon as possible. The client cannot use deep storage as a query cache layer.
Async download is a beta feature in Imply Hybrid and has known limitations around Druid resource contention. If you plan to use async queries with Imply Hybrid, consider the following recommendations:
- Limit concurrency on asynchronous queries. Minimize the number of async queries running simultaneously. Reduce the overlap of running async queries while using Pivot for interactive queries.
- Tune your Druid settings based on your specific query types and hardware specification. Review Query-specific properties for further information on groupBy and Scan queries, as well as large subqueries. The default Druid settings are not optimized for long-running async queries.
Setup
Async download is disabled by default.
Load the extension in the common.runtime.properties
to enable async query downloads:
druid.extensions.loadList=["imply-sql-async"]
If you update to the latest version of Imply from an Imply version older than 2022.01, wait for your cluster to finish updating before enabling async downloads. You cannot enable async downloads during the upgrade.
To use async downloads in Pivot, also enable the async downloads feature flag. For more information on using async downloads in Pivot, see Download data.
Async download configuration
Configure the following async download settings in the common.runtime.properties
:
- Storage for query results
- Time periods for cleaning query states and result files
- Limits on query execution
We strongly recommend you use a separate Broker tier in your Druid cluster to process async queries. See details in Cluster recommendations.
You may need to tune the default properties of certain query types for use in async downloads. For more information, see Query-specific properties.
Query result storage
You can store query results in local file storage or Amazon S3. Async download stores results locally by default. Do not use local file storage for production environments. Local file storage is only suitable for testing in quickstart environments with a single Broker.
Local storage
The local storage uses a local disk on the Broker to store result files. Do not use local file storage for production environments. Local file storage is only suitable for testing in quickstart environments with a single Broker. The async downloads APIs are not "sticky" when you use local storage. In multi-Broker environments, when you attempt to fetch the stored file with the Results API, you may communicate with a different Broker than the one that hosts the file, so it may seem like you have no results. Use S3 storage for production environments having multiple Brokers.
config | description | required | default |
---|---|---|---|
druid.query.async.storage.type | Must be set to local . | no | local |
druid.query.async.storage.local.directory | Directory to store query results. | yes when local storage is used | not defined |
Example configuration for local storage:
druid.query.async.storage.type=local
druid.query.async.storage.local.directory=/path/to/your/directory
S3
To store results in Amazon S3, load the S3 extension in the common.runtime.properties
. For example:
druid.extensions.loadList=["imply-sql-async", "druid-s3-extensions"]
The following S3 permissions are required for pushing or fetching query results:
s3:GetObject
s3:PutObject
s3:AbortMultipartUpload
The following S3 permissions are required for removing expired results:
s3:DeleteObject
s3:ListBucket
config | description | required | default |
---|---|---|---|
druid.query.async.storage.type | Must be set to s3 . | yes | local |
druid.query.async.storage.s3.bucket | S3 bucket to store query results. | yes | |
druid.query.async.storage.s3.prefix | S3 prefix to store query results. Do not share the same path with other objects. The auto cleanup task will delete all objects under the same path if it thinks they are expired. | yes | |
druid.query.async.storage.s3.tempDir | Directory path in local disk to store query results temporarily. | yes | |
druid.query.async.storage.s3.maxResultsSize | Max size of each result file. It should be between 5MiB and 5TiB. Supports human-readable format. | no | 100MiB |
druid.query.async.storage.s3.maxTotalResultsSize | Max total size of all query result files. Supports human-readable format. | no | 5GiB |
druid.query.async.storage.s3.chunkSize | This property is intended only to support rare cases. This property defines the size of each chunk to temporarily store in druid.query.async.storage.s3.tempDir . Druid computes the chunk size automatically when this property is not set. The chunk size must be between 5MiB and 5GiB. | no | the greater of 5MiB or maxTotalResultsSize/10000 |
druid.query.async.storage.s3.maxTriesOnTransientErrors | The maximum number of times to attempt S3 API calls to avoid failures due to transient errors. Only modify this property to support rare cases. | no | 10 |
For example, to store your query results in an S3 bucket with the full path s3://bucket-name/a/long/prefix
:
druid.query.async.storage.type=s3
druid.query.async.storage.s3.bucket=bucket-name
druid.query.async.storage.s3.prefix=a/long/prefix
druid.query.async.storage.s3.tempDir=/path/to/your/temp/dir
druid.query.async.storage.s3.maxResultsSize=1GiB
When a result file fails to upload, Druid aborts the upload to clean up partially uploaded files.
If the abort fails after a couple of retries, the partially uploaded files may remain in your S3 bucket.
To automatically purge these partially uploaded files, define a lifecycle rule using the AbortIncompleteMultipartUpload
action.
Specify the number of days to wait in the DaysAfterInitiation
field.
We recommend setting DaysAfterInitiation
to one day.
Query state and result file management
Druid automatically cleans up expired query states and result files. For large results with long downloads, Druid periodically resets the retention period during the download to prevent premature cleanup of results.
Async download uses the following Coordinator duties to manage clean up duties:
killAsyncQueryMetadata
killAsyncQueryResultWithoutMetadata
updateStaleQueryState
When you turn on async download, Druid enables these duties automatically. Configure these duties through the following properties.
config | description | required | default |
---|---|---|---|
druid.query.async.cleanup.timeToRetain | Retention period of query states and result files. Supports the ISO 8601 duration format. | no | PT60S |
druid.query.async.cleanup.timeToWaitAfterBrokerGone | Duration to wait after the Coordinator service detects a missing Broker. If a Broker goes offline for longer than this duration, the Coordinator labels the query state of queries running in the offline Broker as UNDETERMINED . Supports the ISO 8601 duration format. | no | PT1M |
druid.query.async.cleanup.pollPeriod | Run period for the duty group. Must be a form of PT{n}S to set this to n seconds. | no | PT30S |
druid.query.async.readRefreshTime | How often to restart timeToRetain while an async query result is read. Prevents premature cleanup of the query result files for long downloads. Value must be greater than or equal to 1s . Supports the ISO 8601 duration format. | no | PT10S |
The Coordinator runs the clean up duties for query states and results periodically per druid.query.async.cleanup.pollPeriod
.
The poll period affects time-based configuration for duties.
For example, if you set druid.query.async.cleanup.pollPeriod
to PT30S
, the updateStaleQueryState
duty checks whether each query state is expired every 30 seconds.
Even if you set druid.query.async.cleanup.timeToWaitAfterBrokerGone
to PT40S
you may wait up to 60 seconds (two cycles of the poll period) until your query actually expires.
Query execution limits
To limit query execution with async downloads, set the following properties.
config | description | required | default |
---|---|---|---|
druid.query.async.maxConcurrentQueries | Maximum number of active queries that can run concurrently in each Broker. The Broker queues any queries that exceed the active limit. | no | 10% of the number of physical cores in the Broker |
druid.query.async.maxQueriesToQueue | Maximum number of queries to store in the Broker queue. The Broker rejects any queries that exceed the limit. | no | The greater of 10 or three times maxConcurrentQueries |
We strongly recommend that you also set druid.broker.http.maxQueuedBytes
in the broker/runtime.properties
when using async downloads.
maxQueuedBytes
limits the maximum size of intermediate results temporarily stored in the Broker memory.
When the query reaches the defined limit, the Broker exerts backpressure to Historicals to pause sending more results until the Broker can process its existing results.
This backpressure prevents out of memory (OOM) errors so that the Broker can fetch even very large intermediate results from data servers without memory issues.
In general, define the maximum number of bytes as approximately 2 MiB times the number of Historicals.
See Broker configuration for more information on druid.broker.http.maxQueuedBytes
and Basic cluster tuning for details on the recommended setting.
Async downloads rely on synchronous calls between the Historical and the Broker within Druid. These calls may time out for a long query, causing your query to fail. See Troubleshooting query execution for more information.
Cluster recommendations
Each Druid query consumes a certain amount of cluster resources, such as processing threads, memory buffers for intermediate query results, and HTTP threads for communicating between Brokers and data servers. Async download introduces mixed workloads to Druid in which long-running, async queries run concurrently with short-running, interactive queries. Async queries typically return large results and consume more resources than than short-running, "light" queries. However, this means that async queries will likely throttle the performance of light interactive queries. For example, if you run both sets of queries in the same Druid cluster, async queries may employ all available HTTP threads. This situation slows down subsequent queries, both heavy and light, which triggers timeout errors for the later queries.
To improve query concurrency and reduce the impact on Druid's performance, we strongly recommend a separate Broker tier to handle async downloads. Broker tiering allows you to isolate cluster resources between different workloads. To set up Broker tiering, see Configure Druid for mixed workloads.
Query-specific properties
Druid translates SQL queries into native Druid queries, which have default limits on query resource usage. When executed via async downloads, certain queries may exceed their default limits and fail to execute. This section describes the properties you may want to adjust for groupBy native queries, Scan native queries, and large subqueries.
For all queries, also consider tuning the properties below to allow long-running queries and long-running downloads.
The following properties configure long-running query execution.
Set
druid.server.http.defaultQueryTimeout
to the maximum length of time you want queries to run. Configure this value in theruntime.properties
file for the Broker, Historical, and Overlord. The default value is 300000 milliseconds, or 5 minutes. You cannot submit any async queries when the default query timeout is set to0
.Set
druid.server.http.maxQueryTimeout
to the maximum allowed value for thetimeout
parameter in the query context. Configure this value in theruntime.properties
file for the Broker, Historical, and Overlord. Druid rejects a query iftimeout
in the query context is greater than this value.
The following properties configure long-running query result downloads.
Set
druid.broker.http.readTimeout
to the time allotted to the Broker to read data from Historical servers and real-time tasks. Configure this value in theruntime.properties
file for the Broker.Set
druid.router.http.readTimeout
to the time allotted to the Router to read data from Broker services. Configure this value in theruntime.properties
file for the Router.
Visit the Configuration reference for more information.
GroupBy queries
druid.processing.numMergeBuffers
- limits the number of direct memory buffers available for merging query results. This property effectively caps the number of concurrent groupBy v2 queries. The default value ismax(2, druid.processing.numThreads / 4)
. If you submit more groupBy v2 queries thannumMergeBuffers
on a node, later groupBy v2 queries must wait until merge buffers become available. Since groupBy v2 queries can run for a long duration with async downloads, reduce the wait for merge buffers by increasingnumMergeBuffers
to run more groupBy v2 queries concurrently. Before adding more merge buffers, consider the total memory of your machine as well as the size of your Linux disk cache.druid.query.groupBy.maxMergingDictionarySize
- limits the heap space when merging per-segment results when grouping on strings. The default value is 100 MB. While a higher value may avoid disk spilling and speed up individual queries, the higher heap memory usage can negatively impact query concurrency. We recommend leaving the default value as is. The maximum total memory usage in a node is equal tomaxMergingDictionarySize
timesdruid.processing.numMergeBuffers
.druid.query.groupBy.maxOnDiskStorage
- limits the maximum amount of disk space each query can use if it needs to spill result sets to disk. The default value is 0. SetmaxOnDiskStorage
to1000000000
(1 GB) to enable aggregating results on disk space. The higher value allows queries to run even when they return large results that do not fit in memory. The maximum total disk space usage in a node is equal tomaxOnDiskStorage
timesdruid.processing.numMergeBuffers
.
See more details at GroupBy queries.
Scan queries
Consider your hardware specifications and the expected dimensions of your queries before setting this property. Please consult your Imply representative for assistance if you plan to modify this property.
druid.query.scan.maxSegmentPartitionsOrderedInMemory
- limits the number of segments scanned by each Historical when you request time ordering in your Scan query. This value restrains the off-heap memory used by the query. To run larger Scan queries, increasemaxSegmentPartitionsOrderedInMemory
. However, when setting this value, you should consider your cluster size, number of segments to read, and number of columns in the query. For all queries, the off-heap memory usage can be as high as the product of the following factors:maxSegmentPartitionsOrderedInMemory
- The decompression buffer size, 64 KB
- The number of decompression buffers, typically 4 when scanning over string columns
- The number of columns read per query
- The number of concurrent Scan queries
See more details at Scan queries.
Subqueries
Consider your hardware specifications and the expected dimensions of your queries before setting this property. Please consult your Imply representative for assistance if you plan to modify this property.
druid.server.http.maxSubqueryRows
- limits the maximum number of rows from all subqueries belonging to a query. Raise the subquery row limit if you have large subqueries. However, you must also increase the heap size of the Brokers, Historicals, and task Peons to accommodate the subquery results. For all queries, the maximum heap usage is equal tomaxSubQueryRows
times the number of concurrent queries that have subqueries.
See more details at Server configuration.
Metrics
Broker metrics
async/result/tracked/count
: number of queries tracked by Druid.async/result/tracked/bytes
: total results query size tracked by Druid.async/sqlQuery/running/count
: number of running queries.async/sqlQuery/queued/count
: number of queued queries.async/sqlQuery/running/max
: max number of running queries. Must be the same asdruid.query.async.maxConcurrentQueries
.async/sqlQuery/queued/max
: max number of queued queries. Must be the same asdruid.query.async.maxQueriesToQueue
.
Coordinator metrics
async/cleanup/result/removed/count
: number of query result files successfully deleted in each Coordinator run.async/cleanup/result/failed/count
: number of failed attempts to delete query results in each Coordinator run.async/cleanup/metadata/removed/count
: number of query states successfully cleaned up in each Coordinator run.async/cleanup/metadata/failed/count
: number of failed attempts to clean up query states in each Coordinator run.async/cleanup/metadata/skipped/count
: number of unexpired query states in each Coordinator run.async/query/undetermined/count
: number of queries marked asUNDETERMINED
during eachdruid.query.async.cleanup.pollPeriod
.
Examples
These examples use the Wikipedia data set introduced in the Quickstart. Before trying these examples, enable async download and specify the location to store your results. See Setup for details.
Submit a query
To submit a new query, send the query as a raw JSON string in a POST request:
curl --location --request POST 'http://localhost:8888/druid/v2/sql/async/' \
--header 'Content-Type: application/json' \
--data-raw '{"query" : "SELECT COUNT(*) FROM wikipedia"}'
Output:
{
"asyncResultId": "84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg",
"state": "INITIALIZED"
}
Get query status
To get the status of your query, send a GET request and pass in the query ID as a path parameter. The query ID is located in the field named "asyncResultId".
curl --location --request GET \
'http://localhost:8888/druid/v2/sql/async/84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg/status'
Output:
{
"asyncResultId": "84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg",
"state": "COMPLETE",
"resultFormat": "object",
"resultLength": 19
}
If you get a 404 Not Found error, your query may have expired. Fetch the query status within the retention period, or configure the query retention period using the druid.query.async.cleanup.timeToRetain
property.
Get query results
To get the results of your query, send a GET request and pass in the query ID as a path parameter.
curl --location --request GET \
'http://localhost:8888/druid/v2/sql/async/84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg/results'
Output:
[
{
"Counts": 24433
}
]
If you get a 404 Not Found error, your query may have expired. Fetch the query result within the retention period, or configure the query retention period using the druid.query.async.cleanup.timeToRetain
property.
Delete query
To delete your query, send a DELETE request and pass in the query ID as a path parameter.
curl --location --request DELETE \
'http://localhost:8888/druid/v2/sql/async/84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg'
Druid returns an HTTP 202 response when the request is accepted. Note that Druid automatically deletes a query once its retention period expires.
Known issues
- When the Broker shuts down gracefully, it marks all queries running in it as
FAILED
before it completely shuts down. However, the Broker may fail before it updates all the query states, leaving some query states unchanged. In this case, you must manually clean up the queries left in invalid states. - Even when the Broker shuts down gracefully, it currently doesn't wait for running queries to complete before
it completely shuts down. Instead, it simply marks them
FAILED
. This can cause your async queries to fail during rolling updates.
Learn more
See the following topics for more information:
- Async SQL download API for reference on the async download API.
- Download data for enabling and using async downloads in Pivot.
- Druid SQL API for details on the synchronous SQL API.