Skip to main content

Ingest using SQL by API

You can use SQL to define batch ingestion jobs in Imply Polaris. With SQL-based ingestion, you describe the source data, destination table, and any data transformations in a single SQL statement. SQL-based ingestion is an alternative approach to using the ingestion job spec to create your batch ingestion job.

This topic shows you how to submit a SQL-based ingestion job using the API.

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024. See Migrate to project-scoped URL for more information.

Prerequisites

This topic assumes that you have the following:

  • A batch ingestion source.
  • An API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Submit SQL-based ingestion job

Send a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a SQL-based ingestion job. Define a SQL-based ingestion job with the following properties:

  • type: Set the job type to sql.

  • query: The SQL statement that describes the ingestion job. For reference on syntax and to see example queries, see Ingest using SQL. Ensure that fields in quotation marks in the SQL statement are properly escaped so that the request payload is in valid JSON format.

info

You can view the SQL for previous ingestion jobs even if they weren't sql-type ingestion jobs. Polaris returns this information in the query property of the Get job details endpoint.

  • context: You can optionally set context parameters to configure query planning. For more information, see Context parameters.

  • parameters: If your query is a parameterized SQL query, use this property to specify the data types and values for the dynamic parameters. For an example, see Submit parameterized SQL ingestion.

See the Jobs v1 API documentation for more information.

Sample request

The following example creates a SQL-based ingestion job to ingest data into example_table1 from an uploaded file called wikipedia.json:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Content-Type: application/json" \
--header "Authorization: Basic $POLARIS_API_KEY"
--data '{
"type": "sql",
"createTableIfNotExists": true,
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY"
} '

Sample response

The following example shows a successful response:

Click to view the response
{
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "01907566-ca00-72de-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"parameters": [],
"target": {
"tableName": "example_table1",
"type": "table",
"intervals": null
},
"createdBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-4aa2-xxxx-xxxxxxxxxxxx"
},
"createdTimestamp": "2024-07-02T21:43:09.056885Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "01907566-ca00-72de-xxxx-xxxxxxxxxxxx",
"lastModifiedBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-4aa2-xxxx-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2024-07-02T21:43:09.056885Z",
"spec": {
"query": "INSERT INTO \"example_table1\" SELECT TIME_PARSE(\"timestamp\") AS __time, * FROM TABLE(POLARIS_UPLOADED(files => ARRAY['wikipedia.json'], format => 'json')) EXTEND(\"timestamp\" VARCHAR, \"channel\" VARCHAR, \"isRobot\" VARCHAR, \"added\" INTEGER) PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "01907566-ca00-72de-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"createTableIfNotExists": true,
"parameters": [],
"type": "sql"
},
"type": "sql",
"completedTimestamp": null,
"startedTimestamp": null
}

Submit parameterized SQL ingestion

Parameterizing SQL allows you to pass parameters into a query rather than use constant values. The following example builds upon the data ingested in the previous section to ingest data into example_table2 from example_table1 with time, text, and numeric filters. It creates a parameterized SQL-based ingestion job that uses placeholders, indicated by ? within the SQL query, to represent values. The job passes in ordered parameter values using the parameters property. The parameters property is an array of objects where each object contains a type and value property. The order of the array elements indicate the order that the parameters replace ? characters in the SQL query.

INSERT INTO "EXAMPLE_TABLE_2"
SELECT *
FROM "EXAMPLE_TABLE_1"
WHERE __time < ? AND
channel = ? AND
added >= ?
PARTITIONED BY DAY

The complete request body for the parameterized query is as follows:

{
"type": "sql",
"createTableIfNotExists": true,
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
]
}

Sample request

The following example shows how to ingest data from another table, where the ingested data lies within two timestamp parameters:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Content-Type: application/json" \
--header "Authorization: Basic $POLARIS_API_KEY"
--data '{
"type": "sql",
"createTableIfNotExists": true,
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
]
}'

Sample response

The following example shows a successful response:

Click to view the response
{
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "019075ee-6832-xxxx-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
],
"target": {
"tableName": "example_table2",
"type": "table",
"intervals": null
},
"createdBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-xxxx-xxxx-xxxxxxxxxxxx"
},
"createdTimestamp": "2024-07-03T00:11:16.914513Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "019075ee-6832-xxxx-xxxx-xxxxxxxxxxxx",
"lastModifiedBy": {
"username": "api-key-pok_6zg8n...w6q06h",
"userId": "10a07151-cf3d-xxxx-xxxx-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2024-07-03T00:11:16.914513Z",
"spec": {
"query": "INSERT INTO \"example_table2\" SELECT * FROM \"example_table1\" WHERE __time < ? AND channel = ? AND added >= ? PARTITIONED BY DAY",
"context": {
"mode": "nonStrict",
"sqlQueryId": "019075ee-6832-xxxx-xxxx-xxxxxxxxxxxx",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": true,
"durableShuffleStorage": true,
"catalogValidationEnabled": false,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"createTableIfNotExists": true,
"parameters": [
{
"type": "TIMESTAMP",
"value": "2016-06-27T00:10:00"
},
{
"type": "VARCHAR",
"value": "#en.wikipedia"
},
{
"type": "INTEGER",
"value": 500
}
],
"type": "sql"
},
"type": "sql",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

For reference on SQL syntax as well as examples of queries for SQL-based ingestion, see Ingest using SQL.