• Developer guide
  • API reference

›Ingestion sources

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Specify data schema
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Ingest data from Amazon S3 by API

You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Amazon S3.

This topic covers the process you need to follow to create a connection to an Amazon S3 bucket and ingest data from the bucket into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.

For a list of all ingestion options, see Sources.

Prerequisites

Before you set up ingestion from S3, review Ingest from Amazon S3 for the required information regarding your S3 bucket and AWS access management. Ensure you grant access to the Imply role by listing the ARN of Imply's role as a principal in your trust policy. For more information, see Get Imply role ARN to grant access to AWS resources.

You also need a Polaris API key with the following permissions:

  • ManageTables
  • ManageConnections
  • ManageIngestionJobs

In the examples below, the key value is stored in the environment variable named POLARIS_API_KEY. See Authenticate with API keys to obtain an API key and assign service account permissions. For more information on permissions, visit Permissions reference.

Create a table

Create a table that will receive the data from the ingestion job connected to the S3 bucket.

The examples in this topic use the aggregate table Koalas Subset created in Create a table with a schema.

Create a connection to Amazon S3

Send a POST request to the /v2/connections endpoint to create a connection. When creating a connection to an S3 bucket, specify the bucket name, S3 API endpoint, and the details of the AWS role for access to the S3 bucket. For the requisite information to connect to an S3 bucket from Polaris, see Ingest from Amazon S3.

Each connection is associated with a single bucket, which is treated as a single source of data files. If you plan to ingest data from multiple buckets, create a new connection for each one.

You can limit the objects available in the connection by specifying a prefix property. The prefix you specify when creating a connection limits the objects available through the connection. For example, suppose your bucket has the following objects:

  • projectA/file01.json
  • projectA/file02.json
  • projectB/file01.json
  • projectB/file02.json

Include "prefix": "projectA/" in the request payload to make only projectA/file01.json and projectA/file02.json available through the connection. When you create the ingestion job, you can specify additional prefixes or other object descriptors to select particular objects available in the connection.

For a description of the required parameters, see the Connections v2 API documentation.

Sample request

The following example request creates a connection named demo-conn to an S3 bucket named demo-bucket.

Replace ORGANIZATION_NAME as well as the values for name, awsAssumedRoleArn, awsEndpoint, and bucket with the values for your setup.

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "s3",
"name": "demo-conn",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "s3",
"name": "demo-conn",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
    "type": "s3",
    "name": "demo-conn",
    "submittedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "submittedOnTimestamp": "2022-10-27T21:04:03Z",
    "modifiedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "modifiedOnTimestamp": "2022-10-27T21:04:03Z",
    "awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
    "awsEndpoint": "s3.us-east-1.amazonaws.com",
    "bucket": "demo-bucket"
}

Ingest from S3

Submit a POST request to the /v2/jobs endpoint to create a batch ingestion job. In the request body, set the type property of the source object to s3. Do not use the connection type.

Designate the objects to ingest by defining one of the following object descriptors:

  • uris: S3 object URIs.
  • prefixes: Object prefixes. Requires the s3:ListBucket permission for the given prefixes.
  • objects: Object names.
  • pattern: A wildcard pattern for object key names. A wildcard pattern, or a glob expression, accepts wildcards such as * and ? to specify sets of filenames. For supported wildcards and examples, see the Oracle documentation.

For example, consider a bucket named zoo with a folder named penguins. The folder contains the following objects: adelaide.json, chinstrap.json, emperor.json, gentoo.json. The following source designations are equivalent:

  • "uris": ["s3://zoo/penguins/adelaide.json", "s3://zoo/penguins/chinstrap.json", "s3://zoo/penguins/emperor.json", "s3://zoo/penguins/gentoo.json"]
  • "prefixes": ["penguins/"]
  • "objects": ["penguins/adelaide.json", "penguins/chinstrap.json", "penguins/emperor.json", "penguins/gentoo.json"]
  • "pattern": "penguins/*.json"
  • "pattern": "**.json"

Ensure that the role you provide for the S3 connection has read access to the specified objects. For more information, see Ingest from Amazon S3.

In the request payload, list the format of the S3 objects in formatSettings. Polaris requires all objects in an ingestion job to have the same file type. Create a separate job for each file type to ingest.

See the Job v2 API documentation for a complete description of the required parameters.

Sample request

The following example request creates a batch ingestion job for the Koalas Subset table using the following details:

  • Connection named demo-conn, which points to the S3 bucket named demo-bucket
  • S3 object identified at polaris-ingest/demo-file.json.gz defined in uris
cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response

{
    "type": "batch",
    "id": "674f3355-7e17-4158-8f53-de3d5b4ee7c4",
    "target": {
        "type": "table",
        "tableName": "Koalas Subset"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2022-10-28T21:36:36.547149211Z",
    "lastUpdatedTimestamp": "2022-10-28T21:36:36.547149211Z",
    "source": {
        "type": "s3",
        "connectionName": "demo-conn",
        "uris": [
            "s3://demo-bucket/polaris-ingest/demo-file.json.gz"
        ],
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "city"
            },
            {
                "dataType": "string",
                "name": "session"
            },
            {
                "dataType": "long",
                "name": "session_length"
            }
        ],
        "formatSettings": {
            "format": "nd-json"
        }
    },
    "ingestionMode": "append",
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        },
        {
            "columnName": "city",
            "expression": "\"city\""
        },
        {
            "columnName": "session",
            "expression": "\"session\""
        },
        {
            "columnName": "max_session_length",
            "expression": "MAX(\"session_length\")"
        },
        {
            "columnName": "__count",
            "expression": "COUNT(*)"
        }
    ]
}

Learn more

See the following topics for more information:

  • Ingest from Amazon S3 for reference on connecting from Amazon S3 to Polaris.
  • Tables v2 API for information on creating and managing tables.
  • Connections v2 API for information on creating and managing connections.
  • Jobs v2 API for information on creating and managing ingestion jobs.
← Get ARN for AWS accessIngest from Kafka and MSK →
  • Prerequisites
  • Create a table
  • Create a connection to Amazon S3
    • Sample request
    • Sample response
  • Ingest from S3
    • Sample request
    • Sample response
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc