Ingest using SQL by API
You can use SQL to define batch ingestion jobs in Imply Polaris. With SQL-based ingestion, you describe the source data, destination table, and any data transformations in a single SQL statement. SQL-based ingestion is an alternative approach to using the ingestion job spec to create your batch ingestion job.
This topic shows you how to submit a SQL-based ingestion job using the API.
Prerequisites
This topic assumes that you have the following:
- A batch ingestion source.
- An API key with the
ManageIngestionJobs
permission. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Submit SQL-based ingestion job
Send a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a SQL-based ingestion job.
Define a SQL-based ingestion job with the following properties:
type
: Set the job type tosql
.query
: The SQL statement that describes the ingestion job. For reference on syntax and to see example queries, see Ingest using SQL. Ensure that fields in quotation marks in the SQL statement are properly escaped so that the request payload is in valid JSON format.
You can view the SQL for previous ingestion jobs even if they weren't sql
-type ingestion jobs.
Polaris returns this information in the query
property of the Get job details endpoint.
context
: You can optionally set context parameters to configure query planning. For more information, see Context parameters.parameters
: If your query is a parameterized SQL query, use this property to specify the data types and values for the dynamic parameters. For an example, see Submit parameterized SQL ingestion.
See the Jobs v1 API documentation for more information.
Sample request
The following example creates a SQL-based ingestion job to ingest data into example_table1
from an uploaded file called wikipedia.json
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Content-Type: application/json" \
--header "Authorization: Basic $POLARIS_API_KEY"
--data '{
"type": "sql",
"createTableIfNotExists": true,
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY"
} '
import requests
import os
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "sql",
"createTableIfNotExists": True,
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY"
})
headers = {
'Content-Type': 'application/json',
'Authorization': f'Basic {apikey}',
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response:
Click to view the response
{
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "01907566-ca00-72de-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"parameters": [],
"target": {
"tableName": "example_table1",
"type": "table",
"intervals": null
},
"createdBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-4aa2-xxxx-xxxxxxxxxxxx"
},
"createdTimestamp": "2024-07-02T21:43:09.056885Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "01907566-ca00-72de-xxxx-xxxxxxxxxxxx",
"lastModifiedBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-4aa2-xxxx-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2024-07-02T21:43:09.056885Z",
"spec": {
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "01907566-ca00-72de-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"createTableIfNotExists": true,
"parameters": [],
"type": "sql"
},
"type": "sql",
"completedTimestamp": null,
"startedTimestamp": null
}
Submit parameterized SQL ingestion
Parameterizing SQL allows you to pass parameters into a query rather than use constant values. The following example builds upon the data ingested in the previous section to ingest data into example_table2
from example_table1
with time, text, and numeric filters. It creates a parameterized SQL-based ingestion job that uses placeholders, indicated by ?
within the SQL query, to represent values. The job passes in ordered parameter values using the parameters
property. The parameters
property is an array of objects where each object contains a type
and value
property. The order of the array elements indicate the order that the parameters replace ?
characters in the SQL query.
INSERT INTO "EXAMPLE_TABLE_2"
SELECT *
FROM "EXAMPLE_TABLE_1"
WHERE __time < ? AND
channel = ? AND
added >= ?
PARTITIONED BY DAY
The complete request body for the parameterized query is as follows:
{
"type": "sql",
"createTableIfNotExists": true,
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
]
}
Sample request
The following example shows how to ingest data from another table, where the ingested data lies within two timestamp parameters:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Content-Type: application/json" \
--header "Authorization: Basic $POLARIS_API_KEY"
--data '{
"type": "sql",
"createTableIfNotExists": true,
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
]
}'
import requests
import os
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "sql",
"createTableIfNotExists": True,
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
]
})
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Basic {apikey}',
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response:
Click to view the response
{
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "019075ee-6832-xxxx-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
],
"target": {
"tableName": "example_table2",
"type": "table",
"intervals": null
},
"createdBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-xxxx-xxxx-xxxxxxxxxxxx"
},
"createdTimestamp": "2024-07-03T00:11:16.914513Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "019075ee-6832-xxxx-xxxx-xxxxxxxxxxxx",
"lastModifiedBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-xxxx-xxxx-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2024-07-03T00:11:16.914513Z",
"spec": {
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "019075ee-6832-xxxx-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"createTableIfNotExists": true,
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
],
"type": "sql"
},
"type": "sql",
"completedTimestamp": null,
"startedTimestamp": null
}
Learn more
For reference on SQL syntax as well as examples of queries for SQL-based ingestion, see Ingest using SQL.