Skip to main content

Asynchronous SQL download (deprecated)

Asynchronous download is deprecated.

Apache Druid supports two query languages: Druid SQL and native queries. This document describes the SQL language.

Asynchronous query download (async download) lets you run longer-executing queries and retrieve the results after the queries complete. It solves problems caused when timeouts cause interruptions in the connection between query clients and the Druid cluster.

Async download does not:

  • Provide file management APIs. Druid is not a file management system, therefore Druid does not expose the concept of the file containing the results to users.
  • Support long retention periods for the query results. You should write your query to fetch the query result as soon as possible. The client cannot use deep storage as a query cache layer.

Async download is a beta feature in Imply Hybrid and has known limitations around Druid resource contention. If you plan to use async queries with Imply Hybrid, consider the following recommendations:

  • Limit concurrency on asynchronous queries. Minimize the number of async queries running simultaneously. Reduce the overlap of running async queries while using Pivot for interactive queries.
  • Tune your Druid settings based on your specific query types and hardware specification. Review Query-specific properties for further information on groupBy and Scan queries, as well as large subqueries. The default Druid settings are not optimized for long-running async queries.

Setup

Async download is disabled by default. Load the extension in the common.runtime.properties to enable async query downloads:

druid.extensions.loadList=["imply-sql-async"]

If you update to the latest version of Imply from an Imply version older than 2022.01, wait for your cluster to finish updating before enabling async downloads. You cannot enable async downloads during the upgrade.

To use async downloads in Pivot, also enable the async downloads feature flag. For more information on using async downloads in Pivot, see Download data.

Async download configuration

Configure the following async download settings in the common.runtime.properties:

We strongly recommend you use a separate Broker tier in your Druid cluster to process async queries. See details in Cluster recommendations.

You may need to tune the default properties of certain query types for use in async downloads. For more information, see Query-specific properties.

Query result storage

You can store query results in local file storage or Amazon S3. Async download stores results locally by default. Do not use local file storage for production environments. Local file storage is only suitable for testing in quickstart environments with a single Broker.

Local storage

The local storage uses a local disk on the Broker to store result files. Do not use local file storage for production environments. Local file storage is only suitable for testing in quickstart environments with a single Broker. The async downloads APIs are not "sticky" when you use local storage. In multi-Broker environments, when you attempt to fetch the stored file with the Results API, you may communicate with a different Broker than the one that hosts the file, so it may seem like you have no results. Use S3 storage for production environments having multiple Brokers.

configdescriptionrequireddefault
druid.query.async.storage.typeMust be set to local.nolocal
druid.query.async.storage.local.directoryDirectory to store query results.yes when local storage is usednot defined

Example configuration for local storage:

druid.query.async.storage.type=local
druid.query.async.storage.local.directory=/path/to/your/directory

S3

To store results in Amazon S3, load the S3 extension in the common.runtime.properties. For example:

druid.extensions.loadList=["imply-sql-async", "druid-s3-extensions"]

The following S3 permissions are required for pushing or fetching query results:

  • s3:GetObject
  • s3:PutObject
  • s3:AbortMultipartUpload

The following S3 permissions are required for removing expired results:

  • s3:DeleteObject
  • s3:ListBucket
configdescriptionrequireddefault
druid.query.async.storage.typeMust be set to s3.yeslocal
druid.query.async.storage.s3.bucketS3 bucket to store query results.yes
druid.query.async.storage.s3.prefixS3 prefix to store query results. Do not share the same path with other objects. The auto cleanup task will delete all objects under the same path if it thinks they are expired.yes
druid.query.async.storage.s3.tempDirDirectory path in local disk to store query results temporarily.yes
druid.query.async.storage.s3.maxResultsSizeMax size of each result file. It should be between 5MiB and 5TiB. Supports human-readable format.no100MiB
druid.query.async.storage.s3.maxTotalResultsSizeMax total size of all query result files. Supports human-readable format.no5GiB
druid.query.async.storage.s3.chunkSizeThis property is intended only to support rare cases. This property defines the size of each chunk to temporarily store in druid.query.async.storage.s3.tempDir. Druid computes the chunk size automatically when this property is not set. The chunk size must be between 5MiB and 5GiB.nothe greater of 5MiB or maxTotalResultsSize/10000
druid.query.async.storage.s3.maxTriesOnTransientErrorsThe maximum number of times to attempt S3 API calls to avoid failures due to transient errors. Only modify this property to support rare cases.no10

For example, to store your query results in an S3 bucket with the full path s3://bucket-name/a/long/prefix:

druid.query.async.storage.type=s3
druid.query.async.storage.s3.bucket=bucket-name
druid.query.async.storage.s3.prefix=a/long/prefix
druid.query.async.storage.s3.tempDir=/path/to/your/temp/dir
druid.query.async.storage.s3.maxResultsSize=1GiB

When a result file fails to upload, Druid aborts the upload to clean up partially uploaded files. If the abort fails after a couple of retries, the partially uploaded files may remain in your S3 bucket. To automatically purge these partially uploaded files, define a lifecycle rule using the AbortIncompleteMultipartUpload action. Specify the number of days to wait in the DaysAfterInitiation field. We recommend setting DaysAfterInitiation to one day.

Query state and result file management

Druid automatically cleans up expired query states and result files. For large results with long downloads, Druid periodically resets the retention period during the download to prevent premature cleanup of results.

Async download uses the following Coordinator duties to manage clean up duties:

  • killAsyncQueryMetadata
  • killAsyncQueryResultWithoutMetadata
  • updateStaleQueryState

When you turn on async download, Druid enables these duties automatically. Configure these duties through the following properties.

configdescriptionrequireddefault
druid.query.async.cleanup.timeToRetainRetention period of query states and result files. Supports the ISO 8601 duration format.noPT60S
druid.query.async.cleanup.timeToWaitAfterBrokerGoneDuration to wait after the Coordinator service detects a missing Broker. If a Broker goes offline for longer than this duration, the Coordinator labels the query state of queries running in the offline Broker as UNDETERMINED. Supports the ISO 8601 duration format.noPT1M
druid.query.async.cleanup.pollPeriodRun period for the duty group. Must be a form of PT{n}S to set this to n seconds.noPT30S
druid.query.async.readRefreshTimeHow often to restart timeToRetain while an async query result is read. Prevents premature cleanup of the query result files for long downloads. Value must be greater than or equal to 1s. Supports the ISO 8601 duration format.noPT10S

The Coordinator runs the clean up duties for query states and results periodically per druid.query.async.cleanup.pollPeriod. The poll period affects time-based configuration for duties. For example, if you set druid.query.async.cleanup.pollPeriod to PT30S, the updateStaleQueryState duty checks whether each query state is expired every 30 seconds. Even if you set druid.query.async.cleanup.timeToWaitAfterBrokerGone to PT40S you may wait up to 60 seconds (two cycles of the poll period) until your query actually expires.

Query execution limits

To limit query execution with async downloads, set the following properties.

configdescriptionrequireddefault
druid.query.async.maxConcurrentQueriesMaximum number of active queries that can run concurrently in each Broker. The Broker queues any queries that exceed the active limit.no10% of the number of physical cores in the Broker
druid.query.async.maxQueriesToQueueMaximum number of queries to store in the Broker queue. The Broker rejects any queries that exceed the limit.noThe greater of 10 or three times maxConcurrentQueries

We strongly recommend that you also set druid.broker.http.maxQueuedBytes in the broker/runtime.properties when using async downloads. maxQueuedBytes limits the maximum size of intermediate results temporarily stored in the Broker memory. When the query reaches the defined limit, the Broker exerts backpressure to Historicals to pause sending more results until the Broker can process its existing results. This backpressure prevents out of memory (OOM) errors so that the Broker can fetch even very large intermediate results from data servers without memory issues. In general, define the maximum number of bytes as approximately 2 MiB times the number of Historicals. See Broker configuration for more information on druid.broker.http.maxQueuedBytes and Basic cluster tuning for details on the recommended setting.

Async downloads rely on synchronous calls between the Historical and the Broker within Druid. These calls may time out for a long query, causing your query to fail. See Troubleshooting query execution for more information.

Cluster recommendations

Each Druid query consumes a certain amount of cluster resources, such as processing threads, memory buffers for intermediate query results, and HTTP threads for communicating between Brokers and data servers. Async download introduces mixed workloads to Druid in which long-running, async queries run concurrently with short-running, interactive queries. Async queries typically return large results and consume more resources than than short-running, "light" queries. However, this means that async queries will likely throttle the performance of light interactive queries. For example, if you run both sets of queries in the same Druid cluster, async queries may employ all available HTTP threads. This situation slows down subsequent queries, both heavy and light, which triggers timeout errors for the later queries.

To improve query concurrency and reduce the impact on Druid's performance, we strongly recommend a separate Broker tier to handle async downloads. Broker tiering allows you to isolate cluster resources between different workloads. To set up Broker tiering, see Configure Druid for mixed workloads.

Query-specific properties

Druid translates SQL queries into native Druid queries, which have default limits on query resource usage. When executed via async downloads, certain queries may exceed their default limits and fail to execute. This section describes the properties you may want to adjust for groupBy native queries, Scan native queries, and large subqueries.

For all queries, also consider tuning the properties below to allow long-running queries and long-running downloads.

The following properties configure long-running query execution.

  • Set druid.server.http.defaultQueryTimeout to the maximum length of time you want queries to run. Configure this value in the runtime.properties file for the Broker, Historical, and Overlord. The default value is 300000 milliseconds, or 5 minutes. You cannot submit any async queries when the default query timeout is set to 0.

  • Set druid.server.http.maxQueryTimeout to the maximum allowed value for the timeout parameter in the query context. Configure this value in the runtime.properties file for the Broker, Historical, and Overlord. Druid rejects a query if timeout in the query context is greater than this value.

The following properties configure long-running query result downloads.

  • Set druid.broker.http.readTimeout to the time allotted to the Broker to read data from Historical servers and real-time tasks. Configure this value in the runtime.properties file for the Broker.

  • Set druid.router.http.readTimeout to the time allotted to the Router to read data from Broker services. Configure this value in the runtime.properties file for the Router.

Visit the Configuration reference for more information.

GroupBy queries

  • druid.processing.numMergeBuffers - limits the number of direct memory buffers available for merging query results. This property effectively caps the number of concurrent groupBy v2 queries. The default value is max(2, druid.processing.numThreads / 4). If you submit more groupBy v2 queries than numMergeBuffers on a node, later groupBy v2 queries must wait until merge buffers become available. Since groupBy v2 queries can run for a long duration with async downloads, reduce the wait for merge buffers by increasing numMergeBuffers to run more groupBy v2 queries concurrently. Before adding more merge buffers, consider the total memory of your machine as well as the size of your Linux disk cache.

  • druid.query.groupBy.maxMergingDictionarySize - limits the heap space when merging per-segment results when grouping on strings. The default value is 100 MB. While a higher value may avoid disk spilling and speed up individual queries, the higher heap memory usage can negatively impact query concurrency. We recommend leaving the default value as is. The maximum total memory usage in a node is equal to maxMergingDictionarySize times druid.processing.numMergeBuffers.

  • druid.query.groupBy.maxOnDiskStorage - limits the maximum amount of disk space each query can use if it needs to spill result sets to disk. The default value is 0. Set maxOnDiskStorage to 1000000000 (1 GB) to enable aggregating results on disk space. The higher value allows queries to run even when they return large results that do not fit in memory. The maximum total disk space usage in a node is equal to maxOnDiskStorage times druid.processing.numMergeBuffers.

See more details at GroupBy queries.

Scan queries

Consider your hardware specifications and the expected dimensions of your queries before setting this property. Please consult your Imply representative for assistance if you plan to modify this property.

  • druid.query.scan.maxSegmentPartitionsOrderedInMemory - limits the number of segments scanned by each Historical when you request time ordering in your Scan query. This value restrains the off-heap memory used by the query. To run larger Scan queries, increase maxSegmentPartitionsOrderedInMemory. However, when setting this value, you should consider your cluster size, number of segments to read, and number of columns in the query. For all queries, the off-heap memory usage can be as high as the product of the following factors:
    • maxSegmentPartitionsOrderedInMemory
    • The decompression buffer size, 64 KB
    • The number of decompression buffers, typically 4 when scanning over string columns
    • The number of columns read per query
    • The number of concurrent Scan queries

See more details at Scan queries.

Subqueries

Consider your hardware specifications and the expected dimensions of your queries before setting this property. Please consult your Imply representative for assistance if you plan to modify this property.

  • druid.server.http.maxSubqueryRows - limits the maximum number of rows from all subqueries belonging to a query. Raise the subquery row limit if you have large subqueries. However, you must also increase the heap size of the Brokers, Historicals, and task Peons to accommodate the subquery results. For all queries, the maximum heap usage is equal to maxSubQueryRows times the number of concurrent queries that have subqueries.

See more details at Server configuration.

Metrics

Broker metrics

  • async/result/tracked/count: number of queries tracked by Druid.
  • async/result/tracked/bytes: total results query size tracked by Druid.
  • async/sqlQuery/running/count: number of running queries.
  • async/sqlQuery/queued/count: number of queued queries.
  • async/sqlQuery/running/max: max number of running queries. Must be the same as druid.query.async.maxConcurrentQueries.
  • async/sqlQuery/queued/max: max number of queued queries. Must be the same as druid.query.async.maxQueriesToQueue.

Coordinator metrics

  • async/cleanup/result/removed/count: number of query result files successfully deleted in each Coordinator run.
  • async/cleanup/result/failed/count: number of failed attempts to delete query results in each Coordinator run.
  • async/cleanup/metadata/removed/count: number of query states successfully cleaned up in each Coordinator run.
  • async/cleanup/metadata/failed/count: number of failed attempts to clean up query states in each Coordinator run.
  • async/cleanup/metadata/skipped/count: number of unexpired query states in each Coordinator run.
  • async/query/undetermined/count: number of queries marked as UNDETERMINED during each druid.query.async.cleanup.pollPeriod.

Examples

These examples use the Wikipedia data set introduced in the Quickstart. Before trying these examples, enable async download and specify the location to store your results. See Setup for details.

Submit a query

To submit a new query, send the query as a raw JSON string in a POST request:

curl --location --request POST 'http://localhost:8888/druid/v2/sql/async/' \
--header 'Content-Type: application/json' \
--data-raw '{"query" : "SELECT COUNT(*) FROM wikipedia"}'

Output:

{
"asyncResultId": "84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg",
"state": "INITIALIZED"
}

Get query status

To get the status of your query, send a GET request and pass in the query ID as a path parameter. The query ID is located in the field named "asyncResultId".

curl --location --request GET \
'http://localhost:8888/druid/v2/sql/async/84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg/status'

Output:

{
"asyncResultId": "84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg",
"state": "COMPLETE",
"resultFormat": "object",
"resultLength": 19
}

If you get a 404 Not Found error, your query may have expired. Fetch the query status within the retention period, or configure the query retention period using the druid.query.async.cleanup.timeToRetain property.

Get query results

To get the results of your query, send a GET request and pass in the query ID as a path parameter.

curl --location --request GET \
'http://localhost:8888/druid/v2/sql/async/84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg/results'

Output:

[
{
"Counts": 24433
}
]

If you get a 404 Not Found error, your query may have expired. Fetch the query result within the retention period, or configure the query retention period using the druid.query.async.cleanup.timeToRetain property.

Delete query

To delete your query, send a DELETE request and pass in the query ID as a path parameter.

curl --location --request DELETE \
'http://localhost:8888/druid/v2/sql/async/84bfb30485964b27bab31aed9fe5de01_088bdc25-a4ba-4a53-bd12-71e0439b6450_jmhfbopg'

Druid returns an HTTP 202 response when the request is accepted. Note that Druid automatically deletes a query once its retention period expires.

Known issues

  • When the Broker shuts down gracefully, it marks all queries running in it as FAILED before it completely shuts down. However, the Broker may fail before it updates all the query states, leaving some query states unchanged. In this case, you must manually clean up the queries left in invalid states.
  • Even when the Broker shuts down gracefully, it currently doesn't wait for running queries to complete before it completely shuts down. Instead, it simply marks them FAILED. This can cause your async queries to fail during rolling updates.

Learn more

See the following topics for more information: