Filter data to ingest by API
When ingesting data into a table, you can define expressions to limit the ingested data to records that satisfy some condition. Specify conditions using SQL syntax for a WHERE clause. You can also filter records during ingestion using the UI.
Before following along with this topic, familiarize yourself with the following concepts:
Prerequisites
The examples in this topic use the data in the file shows.csv. Upload the data file to your Polaris staging area.
You will also need an API key with the following permissions:
ManageTables
ManageIngestionJobs
In the examples below, Polaris reads the key value from the variable named POLARIS_API_KEY
.
See Authenticate with API keys to obtain an API key and assign service account permissions.
For more information on permissions, visit Permissions reference.
Replace ORGANIZATION_NAME
with the custom domain through which you access Polaris. For example, https://ORGANIZATION_NAME.app.imply.io
.
Create a table
Create a table to store the ingested data.
The following request creates a detail table named Filters demo
:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "detail",
"name": "Filters demo",
"partitioningGranularity": "day",
"timeResolution": "hour",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "detail",
"name": "Filters demo",
"partitioningGranularity": "day",
"timeResolution": "hour",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Ingest data based on a filter expression
Send a POST
request to the /v2/jobs
endpoint to create an ingestion job.
Filter records using the filterExpression
field of the request payload.
The filter expression takes SQL WHERE clause expressions—don't include WHERE in the expression itself.
For example, to only ingest records where minutes_watched
is greater than 10, set the following filter expression in the request payload:
"filterExpression": "\"minutes_watched\" > 10"
See the Job v2 API documentation for a description of required parameters.
Sample request
The following example request ingests data from the shows.csv
file where "minutes_watched" > 10
:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job.
{
"type": "batch",
"id": "e208821f-7097-4273-a003-fae147d55847",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-12-09T19:16:42.748039903Z",
"lastUpdatedTimestamp": "2022-12-09T19:16:42.748039903Z",
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"formatSettings": {
"format": "csv",
"skipHeaderRows": 0
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
]
}
The example data has 19 rows. Polaris only ingests 13 rows based on the values of minutes_watched
:
Learn more
- Create an ingestion job for creating an ingestion job and filtering data using the UI.
- Jobs v2 API for information on creating and managing ingestion jobs.