• Developer guide
  • API reference

›Developer guide

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Navigate the console
  • Key concepts

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest from S3
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Create a dashboard
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Filter data to ingest by API

When ingesting data into a table, you can define expressions to limit the ingested data to records that satisfy some condition. Specify conditions using SQL syntax for a WHERE clause. You can also filter records during ingestion using the UI.

Before following along with this topic, familiarize yourself with the following concepts:

  • Create a table
  • Create an ingestion job
  • Upload files
  • Ingest from files

Prerequisites

The examples in this topic use the data in the file shows.csv. Upload the data file to your Polaris staging area.

You will also need an API key with the following permissions:

  • ManageTables
  • ManageIngestionJobs

In the examples below, Polaris reads the key value from the variable named POLARIS_API_KEY. See Authenticate with API keys to obtain an API key and assign service account permissions. For more information on permissions, visit Permissions reference.

Replace ORGANIZATION_NAME with the custom domain through which you access Polaris. For example, https://ORGANIZATION_NAME.app.imply.io.

Create a table

Create a table to store the ingested data.

The following request creates a detail table named Filters demo:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "detail",
"name": "Filters demo",
"partitioningGranularity": "day",
"timeResolution": "hour",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/tables"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "detail",
"name": "Filters demo",
"partitioningGranularity": "day",
"timeResolution": "hour",
"schema": [
{
"name": "__time",
"dataType": "timestamp"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Ingest data based on a filter expression

Send a POST request to the /v2/jobs endpoint to create an ingestion job. Filter records using the filterExpression field of the request payload. The filter expression takes SQL WHERE clause expressions—don't include WHERE in the expression itself.

For example, to only ingest records where minutes_watched is greater than 10, set the following filter expression in the request payload:

"filterExpression": "\"minutes_watched\" > 10"

See the Job v2 API documentation for a description of required parameters.

Sample request

The following example request ingests data from the shows.csv file where "minutes_watched" > 10:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Filters demo"
},
"source": {
"type": "uploaded",
"fileList": [
"shows.csv"
],
"inputSchema": [
{
"name": "date",
"dataType": "string"
},
{
"name": "uid",
"dataType": "string"
},
{
"name": "show",
"dataType": "string"
},
{
"name": "episode",
"dataType": "string"
},
{
"name": "minutes_watched",
"dataType": "long"
}
],
"formatSettings": {
"format": "csv"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"date\")"
},
{
"columnName": "uid",
"expression": "\"uid\""
},
{
"columnName": "show",
"expression": "\"show\""
},
{
"columnName": "episode",
"expression": "\"episode\""
},
{
"columnName": "minutes_watched",
"expression": "\"minutes_watched\""
}
],
"filterExpression": "\"minutes_watched\" > 10"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job.

{
    "type": "batch",
    "id": "e208821f-7097-4273-a003-fae147d55847",
    "target": {
        "type": "table",
        "tableName": "Filters demo"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2022-12-09T19:16:42.748039903Z",
    "lastUpdatedTimestamp": "2022-12-09T19:16:42.748039903Z",
    "source": {
        "type": "uploaded",
        "fileList": [
            "shows.csv"
        ],
        "inputSchema": [
            {
                "dataType": "string",
                "name": "date"
            },
            {
                "dataType": "string",
                "name": "uid"
            },
            {
                "dataType": "string",
                "name": "show"
            },
            {
                "dataType": "string",
                "name": "episode"
            },
            {
                "dataType": "long",
                "name": "minutes_watched"
            }
        ],
        "formatSettings": {
            "format": "csv",
            "skipHeaderRows": 0
        }
    },
    "ingestionMode": "append",
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"date\")"
        },
        {
            "columnName": "uid",
            "expression": "\"uid\""
        },
        {
            "columnName": "show",
            "expression": "\"show\""
        },
        {
            "columnName": "episode",
            "expression": "\"episode\""
        },
        {
            "columnName": "minutes_watched",
            "expression": "\"minutes_watched\""
        }
    ]
}

The example data has 19 rows. Polaris only ingests 13 rows based on the values of minutes_watched:

Ingested table with filters

Learn more

  • Create an ingestion job for creating an ingestion job and filtering data using the UI.
  • Jobs v2 API for information on creating and managing ingestion jobs.
← Kafka Connector referenceIngest nested data →
  • Prerequisites
  • Create a table
  • Ingest data based on a filter expression
    • Sample request
    • Sample response
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc