Filter data to ingest by API
When ingesting data into a table, you can define expressions to limit the ingested data to records that satisfy some condition. Specify conditions using SQL syntax for a WHERE clause. You can also filter records during ingestion using the UI.
Before following along with this topic, familiarize yourself with the following concepts:
Prerequisites
To complete the steps in this topic, you need:
Data in the file
shows.csv
. Upload this file to your Polaris staging area. To upload files using the Files API, visit Upload files.An API key with the
ManageTables
andManageIngestionJobs
permissions.
In the examples below, Polaris reads the key value from the variable namedPOLARIS_API_KEY
. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Ingest data based on a filter expression
Send a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create an ingestion job.
Filter records using the filterExpression
field of the request payload.
The filter expression takes SQL WHERE clause expressions—don't include WHERE in the expression itself.
For example, to only ingest records where minutes_watched
is greater than 10, set the following filter expression in the request payload:
"filterExpression": "\"minutes_watched\" > 10"
See the Jobs v1 API documentation for a description of required parameters.
Sample request
The following example request ingests data from the shows.csv
file where "minutes_watched" > 10
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": True,
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job.
Click to view the response
{
"type": "batch",
"id": "e208821f-7097-4273-a003-fae147d55847",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-12-09T19:16:42.748039903Z",
"lastUpdatedTimestamp": "2022-12-09T19:16:42.748039903Z",
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"formatSettings": {
"format": "csv",
"skipHeaderRows": 0
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
]
}
The example data has 19 rows. Polaris only ingests 13 rows based on the values of minutes_watched
.
Ingest data limited to a specific time interval
Filter expressions can also apply a limit by time. For example,
"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'"
For additional examples, see the timestamp filter examples.
Do not use CURRENT_TIMESTAMP
or CURRENT_DATE
in a filter, since Polaris translates the function to an actual, static timestamp when the job begins.
This timestamp does not update over time.
For streaming ingestion jobs, you can set the earliest or latest rejection periods.
Time intervals also apply when you replace data in a table. The replacement time interval refers to a section of data that already exists in the table. It's important to note that the time interval specified in the filter expression applies to the source data, not the existing data in the table to replace.
Sample request
The example data contains events from 2022-05-19 to 2022-05-23. The following example shows how to ingest only data from the last two days, 20220-05-22 to 2022-05-23.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"date\" > TIMESTAMP '\''2022-05-21 23:59:59'\''"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": True,
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job.
When the ingestion job completes, the table should contain eight rows corresponding to the last
two days of data.
Click to view the response
{
"source": {
"fileList": [
"shows.csv"
],
"formatSettings": {
"columns": [],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"type": "uploaded"
},
"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'",
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")",
"isAggregation": null
},
{
"columnName": "uid",
"expression": "\"uid\"",
"isAggregation": null
},
{
"columnName": "show",
"expression": "\"show\"",
"isAggregation": null
},
{
"columnName": "episode",
"expression": "\"episode\"",
"isAggregation": null
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-06-12T22:10:33.400972902Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "21947f85-227c-4c63-b052-1c511bb7091d",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-06-12T22:10:33.400972902Z",
"spec": {
"source": {
"fileList": [
"shows.csv"
],
"formatSettings": {
"columns": [],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"type": "uploaded"
},
"target": {
"tableName": "Filters demo",
"type": "table",
"intervals": []
},
"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'",
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")",
"isAggregation": null
},
{
"columnName": "uid",
"expression": "\"uid\"",
"isAggregation": null
},
{
"columnName": "show",
"expression": "\"show\"",
"isAggregation": null
},
{
"columnName": "episode",
"expression": "\"episode\"",
"isAggregation": null
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "Filters demo",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "batch",
"completedTimestamp": null,
"startedTimestamp": null
}
Learn more
- Filter data to ingest for filtering data during ingestion using the UI.
- Jobs v1 API for information on creating and managing ingestion jobs.