Ingest data from Amazon S3 by API
Project-less regional API resources have been deprecated and will be removed by the end of September 2024.
You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023.
For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID
Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.
You can use the Imply Polaris Connections v1 API and Jobs v1 API to ingest event data from Amazon S3.
This topic covers the process you need to follow to create a connection to an Amazon S3 bucket and ingest data from the bucket into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For an end-to-end guide on S3 ingestion in Polaris, see Guide for S3 ingestion.
Prerequisites
Before you create a connection to ingest from S3, complete the following:
Review Connect to Amazon S3 for the required information to create the connection.
Ensure your AWS IAM role has permission to access your data and grants permission for Imply to assume your role. For more information, see Secure connection to AWS and Get Imply role ARN to grant access to AWS resources.
If you don't have one already, create a Polaris API key with the
ManageConnections
permission. If you plan to create tables or ingestion jobs, you also needManageTables
andManageIngestionJobs
, respectively. For more information on permissions, visit Permissions reference. The examples in this topic use a variable namedPOLARIS_API_KEY
to store the API key.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Create a connection to Amazon S3
Send a POST
request to the /v1/projects/PROJECT_ID/connections
endpoint to
create a connection.
Each connection is associated with a single bucket, which is treated as a single source of data files.
If you plan to ingest data from multiple buckets, create a new connection for each one.
To create a connection to an S3 bucket, the following properties are required:
name
: Name to identify the Polaris connection. You cannot change this later.bucket
: Name of the S3 bucket containing the objects to ingest.awsEndpoint
: The endpoint of the S3 service, such ass3.us-east-1.amazonaws.com
.awsAssumedRoleArn
: The ARN of your AWS role with S3 permissions assigned.
To learn more about S3 connection requirements, see S3 connection information.
You can limit the objects available in the connection by specifying an optional prefix
property.
The prefix you specify when creating a connection limits the objects available through the connection.
For example, suppose your bucket has the following objects:
projectA/file01.json
projectA/file02.json
projectB/file01.json
projectB/file02.json
Include "prefix": "projectA/"
in the request payload to make only projectA/file01.json
and projectA/file02.json
available through the connection.
When you create the ingestion job, you can specify additional prefixes or other object descriptors to select particular objects available in the connection.
Sample request
The following example request creates a connection named demo-conn
to an S3 bucket named demo-bucket
.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "s3",
"name": "demo-conn",
"bucket": "demo-bucket",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "s3",
"name": "demo-conn",
"bucket": "demo-bucket",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "s3",
"name": "demo-conn",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2022-10-27T21:04:03Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2022-10-27T21:04:03Z",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
}
Ingest from S3
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a batch ingestion job.
In the request body, set the type
property of the source
object to s3
.
Do not use the connection
type.
Designate the objects to ingest by defining one of the following object descriptors:
uris
: S3 object URIs.prefixes
: Object prefixes. Requires thes3:ListBucket
permission for the given prefixes.objects
: Object names.pattern
: A wildcard pattern for object key names. A wildcard pattern, or a glob expression, accepts wildcards such as*
and?
to specify sets of filenames. For supported wildcards and examples, see the Oracle documentation.
For example, consider a bucket named zoo
with a folder named penguins
.
The folder contains the following objects: adelaide.json
, chinstrap.json
, emperor.json
, gentoo.json
.
The following source designations are equivalent:
"uris": ["s3://zoo/penguins/adelaide.json", "s3://zoo/penguins/chinstrap.json", "s3://zoo/penguins/emperor.json", "s3://zoo/penguins/gentoo.json"]
"prefixes": ["penguins/"]
"objects": ["penguins/adelaide.json", "penguins/chinstrap.json", "penguins/emperor.json", "penguins/gentoo.json"]
"pattern": "penguins/*.json"
"pattern": "**.json"
Ensure that the role you provide for the S3 connection has read access to the specified objects. For more information, see Connect to Amazon S3.
Sample request
The following example request creates a batch ingestion job for the Koalas
table using the following details:
- Connection named
demo-conn
, which points to the S3 bucket nameddemo-bucket
- S3 object identified at
polaris-ingest/demo-file.json.gz
defined inuris
In the request payload, list the format of the S3 objects in formatSettings
.
Polaris requires all objects in an ingestion job to have the same file type.
Create a separate job for each file type to ingest.
See the Jobs v1 API documentation for a complete description of the required parameters.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": true,
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": True,
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "batch",
"id": "674f3355-7e17-4158-8f53-de3d5b4ee7c4",
"target": {
"type": "table",
"tableName": "Koalas"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-10-28T21:36:36.547149211Z",
"lastUpdatedTimestamp": "2022-10-28T21:36:36.547149211Z",
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
]
}
Learn more
See the following topics for more information:
- Guide for S3 ingestion for an end-to-end guide on S3 ingestion in Polaris.
- Connect to Amazon S3 for reference on connecting from Amazon S3 to Polaris.
- Connections v1 API for information on creating and managing connections.
- Jobs v1 API for information on creating and managing ingestion jobs.