Skip to main content

Filter data to ingest by API

When ingesting data into a table, you can define expressions to limit the ingested data to records that satisfy some condition. Specify conditions using SQL syntax for a WHERE clause. You can also filter records during ingestion using the UI.

Before following along with this topic, familiarize yourself with the following concepts:

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024. See Migrate to project-scoped URL for more information.

Prerequisites

To complete the steps in this topic, you need:

  • Data in the file shows.csv. Upload this file to your Polaris staging area. To upload files using the Files API, visit Upload files.

  • An API key with the ManageTables and ManageIngestionJobs permissions.
    In the examples below, Polaris reads the key value from the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Ingest data based on a filter expression

Send a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create an ingestion job. Filter records using the filterExpression field of the request payload. The filter expression takes SQL WHERE clause expressionsdon't include WHERE in the expression itself.

For example, to only ingest records where minutes_watched is greater than 10, set the following filter expression in the request payload:

"filterExpression": "\"minutes_watched\" > 10"

See the Jobs v1 API documentation for a description of required parameters.

Sample request

The following example request ingests data from the shows.csv file where "minutes_watched" > 10:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job.

Click to view the response
{
"type": "batch",
"id": "e208821f-7097-4273-a003-fae147d55847",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-12-09T19:16:42.748039903Z",
"lastUpdatedTimestamp": "2022-12-09T19:16:42.748039903Z",
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"formatSettings": {
"format": "csv",
"skipHeaderRows": 0
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
]
}

The example data has 19 rows. Polaris only ingests 13 rows based on the values of minutes_watched.

Ingest data limited to a specific time interval

Filter expressions can also apply a limit by time. For example,

"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'"

For additional examples, see the timestamp filter examples.

info

Do not use CURRENT_TIMESTAMP or CURRENT_DATE in a filter, since Polaris translates the function to an actual, static timestamp when the job begins. This timestamp does not update over time. For streaming ingestion jobs, you can set the earliest or latest rejection periods.

Time intervals also apply when you replace data in a table. The replacement time interval refers to a section of data that already exists in the table. It's important to note that the time interval specified in the filter expression applies to the source data, not the existing data in the table to replace.

Sample request

The example data contains events from 2022-05-19 to 2022-05-23. The following example shows how to ingest only data from the last two days, 20220-05-22 to 2022-05-23.

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"date\" > TIMESTAMP '\''2022-05-21 23:59:59'\''"
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job. When the ingestion job completes, the table should contain eight rows corresponding to the last two days of data.

Click to view the response
{
"source": {
"fileList": [
"shows.csv"
],
"formatSettings": {
"columns": [],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"type": "uploaded"
},
"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'",
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")",
"isAggregation": null
},
{
"columnName": "uid",
"expression": "\"uid\"",
"isAggregation": null
},
{
"columnName": "show",
"expression": "\"show\"",
"isAggregation": null
},
{
"columnName": "episode",
"expression": "\"episode\"",
"isAggregation": null
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-06-12T22:10:33.400972902Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "21947f85-227c-4c63-b052-1c511bb7091d",
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-06-12T22:10:33.400972902Z",
"spec": {
"source": {
"fileList": [
"shows.csv"
],
"formatSettings": {
"columns": [],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"inputSchema": [
{
"dataType": "string",
"name": "date"
},
{
"dataType": "string",
"name": "uid"
},
{
"dataType": "string",
"name": "show"
},
{
"dataType": "string",
"name": "episode"
},
{
"dataType": "long",
"name": "minutes_watched"
}
],
"type": "uploaded"
},
"target": {
"tableName": "Filters demo",
"type": "table",
"intervals": []
},
"filterExpression": "\"date\" > TIMESTAMP '2022-05-21 23:59:59'",
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")",
"isAggregation": null
},
{
"columnName": "uid",
"expression": "\"uid\"",
"isAggregation": null
},
{
"columnName": "show",
"expression": "\"show\"",
"isAggregation": null
},
{
"columnName": "episode",
"expression": "\"episode\"",
"isAggregation": null
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"type": "batch",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "Filters demo",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "batch",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more