Ingest data from Azure Blob Storage by API
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest data from Azure Blob Storage.
This topic covers the process you need to follow to create a connection to an Azure Blob Storage bucket and ingest data from the bucket into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For an end-to-end guide to Azure Blob Storage ingestion in Polaris, see Guide for Azure Blob Storage ingestion.
Prerequisites
Before you create the connection, complete the following:
Review Connect to Azure Blob Storage for the required information to create the connection.
If you don't have one already, create a Polaris API key with the
ManageConnections
permission. If you plan to create tables or ingestion jobs, you also needManageTables
andManageIngestionJobs
, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable namedPOLARIS_API_KEY
to store the API key.
You don't have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Azure Blob Storage
Send a POST
request to the /v1/projects/PROJECT_ID/connections
endpoint to
create a connection.
Each connection is associated with a single container in the Azure Blob Storage storage account.
To ingest data from multiple containers, create a new connection for each one.
Send a POST
request to create an Azure Blob Storage connection and supply the following properties in the request body:
type
: Connection type isazure
.name
: Name to identify the Polaris connection. You cannot change this later.storageAccount
: Name of the storage account in Azure Blob Storage. The storage account must enable public network access or allow access to Imply specifically. See Connection information.container
: Name of the container in the storage account.secrets
: Authentication credentials using a SAS token (recommended) or storage account access key.For a SAS token,
secrets
takes the following form. ReplaceTOKEN_VALUE
with your SAS token."secrets": {
"type": "sas_token",
"accessKeySecret": "TOKEN_VALUE"
}For a storage account access key,
secrets
takes the following form. ReplaceKEY_VALUE
with your access key."secrets": {
"type": "access_key",
"sasToken": "KEY_VALUE"
}
prefix
: Optional property to limit connection access to items with the prefix string. This prefix applies to the scope of the connection, whereas a prefix you supply in an ingestion job applies to the files to ingest from the connection.
To learn more about Azure Blob Storage connection requirements, see Azure Blob Storage connection information.
Your request payload should resemble the following:
{
"type": "azure",
"name": "azure-connection",
"storageAccount": "example-account",
"container": "example-container",
"secrets": {
"type": "sas_token",
"sasToken": "sv=2022-11-02&ss=bfqt&..."
}
}
Sample request
The following example creates an Azure Blob Storage connection named demo-connection
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "azure",
"name": "demo-connection",
"storageAccount": "demo-account",
"container": "demo-container",
"secrets": {
"type": "sas_token",
"sasToken": "sv=2022-11-02&ss=bfqt&..."
}
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "azure",
"name": "demo-connection",
"storageAccount": "demo-account",
"container": "demo-container",
"secrets": {
"type": "sas_token",
"sasToken": "sv=2022-11-02&ss=bfqt&..."
}
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection:
{
"storageAccount": "demo-account",
"container": "demo-container",
"prefix": null,
"secrets": {
"sasToken": null,
"type": "sas_token"
},
"modifiedByUser": {
"username": "api-key-pok_7udiv...xrujvd",
"userId": "b6340b70-3f30-4ccd-86a0-fe74ebfc7cbe"
},
"modifiedOnTimestamp": "2024-05-31T22:18:47.666287Z",
"name": "demo-connection",
"submittedByUser": {
"username": "api-key-pok_7udiv...xrujvd",
"userId": "b6340b70-3f30-4ccd-86a0-fe74ebfc7cbe"
},
"submittedOnTimestamp": "2024-05-31T22:18:47.666287Z",
"type": "azure",
"description": null
}
Ingest from Azure Blob Storage
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a batch ingestion job.
For details on the properties for a batch ingestion job request, see Load data into a table.
In the request body, set the type
property of the source
object to azure
.
Don't use the connection
type.
In the request body, specify the files from the container to ingest using one of uris
, prefixes
, objects
, and pattern
.
For example:
# URIs
"uris": ["azureStorage://storageAccount/container/prefix1/file.json", "azureStorage://storageAccount/container/prefix2/file2.json"]
# Prefixes
"prefixes": ["prefix1/"]
# Objects
"objects": ["prefix1/file1.json", "prefix1/file2.json"]
# Wildcard pattern
"pattern": "prefix1/*.json"
# Wildcard pattern
"pattern": "**.json"
To reference the Azure Blob Storage connection in SQL-based ingestion, use the POLARIS_SOURCE table function. For more information and examples, see Submit SQL-based ingestion job and SQL ingestion reference.
Sample request
The following example request creates a batch ingestion job for the Koalas
table using the following details:
- Connection named
demo-connection
created in the previous example - File named
data02/kttm-2019-08-21.json.gz
defined inobjects
In the request payload, list the format of the files in formatSettings
.
Polaris requires all objects in an ingestion job to have the same file type.
Create a separate job for each file type to ingest.
See the Jobs v1 API documentation for a complete description of the required parameters.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": true,
"source": {
"type": "azure",
"connectionName": "demo-connection",
"objects": [
"data02/kttm-2019-08-21.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": True,
"source": {
"type": "azure",
"connectionName": "demo-connection",
"objects": [
"data02/kttm-2019-08-21.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"source": {
"connectionName": "demo-connection",
"formatSettings": {
"flattenSpec": null,
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"objects": [
"data02/kttm-2019-08-21.json.gz"
],
"prefixes": null,
"systemFields": [],
"uris": null,
"pattern": null,
"type": "azure"
},
"context": {
"mode": "nonStrict",
"sqlQueryId": "018fd0d8-ec53-7dd8-a42f-ce972348b9a5",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": false,
"durableShuffleStorage": true,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "session",
"expression": "\"session\"",
"isAggregation": null
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"query": "INSERT INTO \"Koalas aggregate\"\nSELECT\n TIME_PARSE(\"timestamp\") AS \"__time\",\n \"city\" AS \"city\",\n \"session\" AS \"session\",\n MAX(\"session_length\") AS \"max_session_length\",\n COUNT(*) AS \"__count\"\nFROM TABLE(\n POLARIS_SOURCE(\n '{\"connectionName\":\"demo-connection\",\"formatSettings\":{\"format\":\"nd-json\"},\"inputSchema\":[{\"dataType\":\"string\",\"name\":\"timestamp\"},{\"dataType\":\"string\",\"name\":\"city\"},{\"dataType\":\"string\",\"name\":\"session\"},{\"dataType\":\"long\",\"name\":\"session_length\"}],\"objects\":[\"data02/kttm-2019-08-21.json.gz\"],\"systemFields\":[],\"type\":\"azure\"}'\n )\n)\n\nGROUP BY 1, 2, 3\nPARTITIONED BY DAY",
"createdBy": {
"username": "api-key-pok_7udiv...xrujvd",
"userId": "b6340b70-3f30-4ccd-86a0-fe74ebfc7cbe"
},
"createdTimestamp": "2024-05-31T22:50:28.307764Z",
"desiredExecutionStatus": "running",
"executionStatus": "failed",
"health": {
"code": "internal_error",
"log": null,
"message": "An internal error occurred. Please retry your request and contact support if the issue persists.",
"status": "error"
},
"id": "018fd0d8-ec53-7dd8-a42f-ce972348b9a5",
"lastModifiedBy": {
"username": "api-key-pok_7udiv...xrujvd",
"userId": "b6340b70-3f30-4ccd-86a0-fe74ebfc7cbe"
},
"lastUpdatedTimestamp": "2024-05-31T22:50:28.307764Z",
"spec": {
"source": {
"connectionName": "demo-connection",
"formatSettings": {
"flattenSpec": null,
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"objects": [
"data02/kttm-2019-08-21.json.gz"
],
"prefixes": null,
"systemFields": [],
"uris": null,
"pattern": null,
"type": "azure"
},
"target": {
"tableName": "Koalas aggregate",
"type": "table",
"intervals": null
},
"context": {
"mode": "nonStrict",
"sqlQueryId": "018fd0d8-ec53-7dd8-a42f-ce972348b9a5",
"maxNumTasks": 75,
"faultTolerance": true,
"taskAssignment": "auto",
"maxParseExceptions": 2147483647,
"finalizeAggregations": false,
"durableShuffleStorage": true,
"clusterStatisticsMergeMode": "SEQUENTIAL",
"groupByEnableMultiValueUnnesting": false
},
"createTableIfNotExists": true,
"filterExpression": null,
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "session",
"expression": "\"session\"",
"isAggregation": null
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)",
"isAggregation": true
}
],
"maxParseExceptions": 2147483647,
"replaceAll": false,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "Koalas aggregate",
"type": "table",
"intervals": null
},
"type": "batch",
"completedTimestamp": "2024-05-31T22:50:28.307764Z",
"startedTimestamp": null
}
Learn more
See the following topics for more information:
- Guide for Azure Blob Storage ingestion for an end-to-end guide to Azure Blob Storage ingestion in Polaris.
- Connect to Azure Blob Storage for reference on connecting from Azure Blob Storage to Polaris.
- Connections v1 API for information on creating and managing connections.
- Jobs v1 API for information on creating and managing ingestion jobs.