Ingest data from Amazon S3 by API
You can use the Imply Polaris Connections v2 API and Jobs v2 API to ingest event data from Amazon S3.
This topic covers the process you need to follow to create a connection to an Amazon S3 bucket and ingest data from the bucket into a Polaris table. For information on how to set up ingestion jobs based on connections in the Polaris UI, see Create a connection.
For a list of all ingestion options, see Sources.
Prerequisites
Before you set up ingestion from S3, review Ingest from Amazon S3 for the required information regarding your S3 bucket and AWS access management. Ensure you grant access to the Imply role by listing the ARN of Imply's role as a principal in your trust policy. For more information, see Get Imply role ARN to grant access to AWS resources.
You also need a Polaris API key with the following permissions:
ManageTables
ManageConnections
ManageIngestionJobs
In the examples below, the key value is stored in the environment variable named POLARIS_API_KEY
.
See Authenticate with API keys to obtain an API key and assign service account permissions.
For more information on permissions, visit Permissions reference.
Create a table
Create a table that will receive the data from the ingestion job connected to the S3 bucket.
The examples in this topic use the aggregate table Koalas Subset
created in Create a table with a schema.
Create a connection to Amazon S3
Send a POST
request to the /v2/connections
endpoint to create a connection.
When creating a connection to an S3 bucket, specify the bucket name, S3 API endpoint, and the details of the AWS role for access to the S3 bucket.
For the requisite information to connect to an S3 bucket from Polaris, see Ingest from Amazon S3.
Each connection is associated with a single bucket, which is treated as a single source of data files. If you plan to ingest data from multiple buckets, create a new connection for each one.
You can limit the objects available in the connection by specifying a prefix
property.
The prefix you specify when creating a connection limits the objects available through the connection.
For example, suppose your bucket has the following objects:
projectA/file01.json
projectA/file02.json
projectB/file01.json
projectB/file02.json
Include "prefix": "projectA/"
in the request payload to make only projectA/file01.json
and projectA/file02.json
available through the connection.
When you create the ingestion job, you can specify additional prefixes or other object descriptors to select particular objects available in the connection.
For a description of the required parameters, see the Connections v2 API documentation.
Sample request
The following example request creates a connection named demo-conn
to an S3 bucket named demo-bucket
.
Replace ORGANIZATION_NAME
as well as the values for name
, awsAssumedRoleArn
, awsEndpoint
, and bucket
with the values for your setup.
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "s3",
"name": "demo-conn",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "s3",
"name": "demo-conn",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "s3",
"name": "demo-conn",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"submittedOnTimestamp": "2022-10-27T21:04:03Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"modifiedOnTimestamp": "2022-10-27T21:04:03Z",
"awsAssumedRoleArn": "arn:aws:iam::012345678901:role/demo-role",
"awsEndpoint": "s3.us-east-1.amazonaws.com",
"bucket": "demo-bucket"
}
Ingest from S3
Submit a POST
request to the /v2/jobs
endpoint to create a batch ingestion job.
In the request body, set the type
property of the source
object to s3
.
Do not use the connection
type.
Designate the objects to ingest by defining one of the following object descriptors:
uris
: S3 object URIs.prefixes
: Object prefixes. Requires thes3:ListBucket
permission for the given prefixes.objects
: Object names.pattern
: A wildcard pattern for object key names. A wildcard pattern, or a glob expression, accepts wildcards such as*
and?
to specify sets of filenames. For supported wildcards and examples, see the Oracle documentation.
For example, consider a bucket named zoo
with a folder named penguins
.
The folder contains the following objects: adelaide.json
, chinstrap.json
, emperor.json
, gentoo.json
.
The following source designations are equivalent:
"uris": ["s3://zoo/penguins/adelaide.json", "s3://zoo/penguins/chinstrap.json", "s3://zoo/penguins/emperor.json", "s3://zoo/penguins/gentoo.json"]
"prefixes": ["penguins/"]
"objects": ["penguins/adelaide.json", "penguins/chinstrap.json", "penguins/emperor.json", "penguins/gentoo.json"]
"pattern": "penguins/*.json"
"pattern": "**.json"
Ensure that the role you provide for the S3 connection has read access to the specified objects. For more information, see Ingest from Amazon S3.
In the request payload, list the format of the S3 objects in
formatSettings
. Polaris requires all objects in an ingestion job to have the same file type. Create a separate job for each file type to ingest.
See the Job v2 API documentation for a complete description of the required parameters.
Sample request
The following example request creates a batch ingestion job for the Koalas Subset
table using the following details:
- Connection named
demo-conn
, which points to the S3 bucket nameddemo-bucket
- S3 object identified at
polaris-ingest/demo-file.json.gz
defined inuris
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"formatSettings": {
"format": "nd-json"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
]
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "batch",
"id": "674f3355-7e17-4158-8f53-de3d5b4ee7c4",
"target": {
"type": "table",
"tableName": "Koalas Subset"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-10-28T21:36:36.547149211Z",
"lastUpdatedTimestamp": "2022-10-28T21:36:36.547149211Z",
"source": {
"type": "s3",
"connectionName": "demo-conn",
"uris": [
"s3://demo-bucket/polaris-ingest/demo-file.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "long",
"name": "session_length"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
]
}
Learn more
See the following topics for more information:
- Ingest from Amazon S3 for reference on connecting from Amazon S3 to Polaris.
- Tables v2 API for information on creating and managing tables.
- Connections v2 API for information on creating and managing connections.
- Jobs v2 API for information on creating and managing ingestion jobs.