Ingest data from Amazon S3 by API
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest data from Amazon S3.
This topic covers the process you need to follow to create a connection to an Amazon S3 bucket and ingest data from the bucket into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For an end-to-end guide on S3 ingestion in Polaris, see Guide for S3 ingestion.
Prerequisites
Before you create a connection to ingest from S3, complete the following:
Review Connect to Amazon S3 for the required information to create the connection.
Ensure your AWS IAM role has permission to access your data and grants permission for Imply to assume your role. For more information, see Secure connection to AWS and Get Imply role ARN to grant access to AWS resources.
If you don't have one already, create a Polaris API key with the
ManageConnections
permission. If you plan to create tables or ingestion jobs, you also needManageTables
andManageIngestionJobs
, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable namedPOLARIS_API_KEY
to store the API key.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Amazon S3
Send a POST
request to the /v1/projects/PROJECT_ID/connections
endpoint to
create a connection.
Each connection is associated with a single bucket, which is treated as a single source of data files.
If you plan to ingest data from multiple buckets, create a new connection for each one.
Required properties
To create a connection to an S3 bucket, the following properties are required:
type
: Connection type iss3
.name
: Name to identify the Polaris connection. You cannot change this later.bucket
: Name of the S3 bucket containing the objects to ingest.awsEndpoint
: Endpoint of the S3 service, such ass3.us-east-1.amazonaws.com
.
To learn more about S3 connection requirements, see S3 connection information.
Authentication
To grant Polaris access to your S3 objects through this connection, provide authentication credentials using IAM role assumption or access keys.
To authenticate using IAM role assumption, supply awsAssumedRoleArn
in the connection request,
and assign its value as the ARN of your AWS IAM role.
Your role must have S3 permissions assigned and must list the ARN of Imply's role as a principal.
For more information, see Secure connections to AWS.
To authenticate using access keys, include secrets
in the request body to create the connection.
Supply the access key ID and secret access key with the connection.
For details on AWS access keys, refer to the AWS IAM documentation.
Your request payload should resemble the following:
{
"type": "s3",
"name": "demo-conn",
"bucket": "demo-bucket",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"secrets": {
"type": "access_key",
"accessKeyId": "AKIAIOSFODNN7EXAMPLE",
"accessKeySecret": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}
}
Limit connection access
You can limit the objects available in the connection by specifying an optional prefix
property.
The prefix you specify when creating a connection limits the objects available through the connection.
For example, suppose your bucket has the following objects:
projectA/file01.json
projectA/file02.json
projectB/file01.json
projectB/file02.json
Include "prefix": "projectA/"
in the request payload to make only projectA/file01.json
and projectA/file02.json
available through the connection.
When you create the ingestion job, you can specify additional prefixes or other object descriptors to select particular objects available in the connection.
Sample request
The following example request creates a connection named demo-conn
to an S3 bucket named demo-bucket
.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "s3",
"name": "demo-conn",
"bucket": "demo-bucket",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "s3",
"name": "demo-conn",
"bucket": "demo-bucket",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "s3",
"name": "demo-conn",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2022-10-27T21:04:03Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2022-10-27T21:04:03Z",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
}
Ingest from S3
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a batch ingestion job.
In the request body, set the type
property of the source
object to s3
.
Do not use the connection
type.
Select objects from connection
Designate the objects to ingest by defining one of the following object descriptors:
uris
: S3 object URIs.prefixes
: Object prefixes. Requires thes3:ListBucket
permission for the given prefixes.objects
: Object names.pattern
: A wildcard pattern for object key names. A wildcard pattern, or a glob expression, accepts wildcards such as*
and?
to specify sets of filenames. For supported wildcards and examples, see the Oracle documentation.
For example, consider a bucket named zoo
with a folder named penguins
.
The folder contains the following objects: adelaide.json
, chinstrap.json
, emperor.json
, gentoo.json
.
The following source designations are equivalent:
# URIs
"uris": ["s3://zoo/penguins/adelaide.json", "s3://zoo/penguins/chinstrap.json", "s3://zoo/penguins/emperor.json", "s3://zoo/penguins/gentoo.json"]
# Prefixes
"prefixes": ["penguins/"]
# Objects
"objects": ["penguins/adelaide.json", "penguins/chinstrap.json", "penguins/emperor.json", "penguins/gentoo.json"]
# Wildcard pattern
"pattern": "penguins/*.json"
# Wildcard pattern
"pattern": "**.json"
Ensure that the authentication mechanism you provided in the connection has read access to the specified objects.
Sample request
The following example request creates a batch ingestion job for the Koalas
table using the following details:
- Connection named
demo-conn
, which points to the S3 bucket nameddemo-bucket
- S3 object identified at
polaris-ingest/demo-file.json.gz
defined inuris
In the request payload, list the format of the S3 objects in formatSettings
.
Polaris requires all objects in an ingestion job to have the same file type.
Create a separate job for each file type to ingest.
See the Jobs v1 API documentation for a complete description of the required parameters.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": true,
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": True,
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "batch",
"id": "674f3355-7e17-4158-8f53-de3d5b4ee7c4",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-10-28T21:36:36.547149211Z",
"lastUpdatedTimestamp": "2022-10-28T21:36:36.547149211Z",
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
]
}
Learn more
See the following topics for more information:
- Guide for S3 ingestion for an end-to-end guide on S3 ingestion in Polaris.
- Connect to Amazon S3 for reference on connecting from Amazon S3 to Polaris.
- Connections v1 API for information on creating and managing connections.
- Jobs v1 API for information on creating and managing ingestion jobs.