• Developer guide
  • API reference

›Developer guide

Getting started

  • Introduction to Imply Polaris
  • Quickstart
  • Execute a POC
  • Create a dashboard
  • Navigate the console
  • Key concepts

Tables and data

  • Overview
  • Introduction to tables
  • Table schema
  • Create an ingestion job
  • Timestamp expressions
  • Data partitioning
  • Introduction to rollup
  • Approximation algorithms
  • Replace data

Ingestion sources

  • Ingestion sources overview
  • Supported data formats
  • Create a connection
  • Ingest from files
  • Ingest data from a table
  • Ingest from S3
  • Ingest from Kafka and MSK
  • Ingest from Kinesis
  • Ingest from Confluent Cloud
  • Kafka Connector for Imply Polaris
  • Push event data
  • Connect to Confluent Schema Registry

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Data cube dimensions
  • Data cube measures
  • Dashboards
  • Visualizations reference
  • Set up alerts
  • Set up reports
  • Embed visualizations
  • Query data

Monitoring

  • Overview

Management

  • Overview
  • Pause and resume a project

Billing

  • Overview
  • Polaris plans
  • Estimate project costs

Usage

  • Overview

Security

    Polaris access

    • Overview
    • Invite users to your organization
    • Manage users
    • Permissions reference
    • Manage user groups
    • Enable SSO
    • SSO settings reference
    • Map IdP groups

    Secure networking

    • Connect to AWS
    • Create AWS PrivateLink connection

Developer guide

  • Overview
  • Authentication

    • Overview
    • Authenticate with API keys
    • Authenticate with OAuth
  • Manage users and groups
  • Migrate deprecated resources
  • Create a table
  • Define a schema
  • Upload files
  • Create an ingestion job
  • Ingestion sources

    • Ingest from files
    • Ingest from a table
    • Get ARN for AWS access
    • Ingest from Amazon S3
    • Ingest from Kafka and MSK
    • Ingest from Amazon Kinesis
    • Ingest from Confluent Cloud
    • Push event data
    • Kafka Connector for Imply Polaris
    • Kafka Connector reference
  • Filter data to ingest
  • Ingest nested data
  • Ingest and query sketches
  • Specify data schema
  • Query data
  • Update a project
  • Link to BI tools
  • Connect over JDBC
  • Query parameters reference
  • API documentation

    • OpenAPI reference
    • Query API

Product info

  • Release notes
  • Known limitations
  • Druid extensions

Specify data schema by API

When you create an ingestion job to load data into a table in Imply Polaris, you define the schema of the input data and map the input fields to the columns of the Polaris table schema. Regardless of your source data format, all ingestion jobs require an input schema defined in the source.inputSchema field of the job specification, such as the following:

"source": {
    "type": "uploaded",
    "fileList": [
        "kttm-2019-08-20.json.gz"
    ],
    "inputSchema": [
        {
            "dataType": "string",
            "name": "timestamp"
        },
        {
            "dataType": "long",
            "name": "session_length"
        }
    ],
    ...
},

The following data formats require additional schema definition to describe the data:

  • Apache Avro
  • Protocol Buffers

Avro and Protobuf formats are not supported for push streaming ingestion.

To ingest data from one of these formats, you not only specify the input schema in source.inputSchema but also provide the schema for the data in your ingestion job request in source.formatSettings.parseSchemaProvider. You provide the data schema using one of the following sources:

  • An inline specification in which you set the schema definition directly within the payload of the ingestion job request.
  • Confluent Schema Registry, a storage layer for storing, organizing, and retrieving schemas.

If you produce Kafka records using Schema Registry, you must also configure Schema Registry in your Polaris ingestion. You cannot use an inline schema specification to read records that contain both a schema ID and the serialized data.

This topic walks you through how to provide the schema of your input data, directly within the job request as well as using Confluent Schema Registry.

Prerequisites

To follow along in this topic, you need the following:

  • An API key with the ManageIngestionJobs role to create and manage ingestion jobs. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. See API key authentication to obtain an API key and assign permissions. Visit User roles reference for more information on roles and their permissions.

  • A table with a defined schema. This topic uses the example table from Create a table with a schema. Create a separate table for each ingestion job example in this topic. The example table contains the following schema:

    • __time: the primary timestamp
    • city: a string dimension
    • session: a string dimension
    • max_session_length: a long measure
  • A connection to a streaming source. This topic uses example Confluent Cloud connections starting with confluent_. Confluent Cloud is used to illustrate the examples in this topic although you can use any supported streaming source. For a list of the sources that Polaris supports for data ingestion, see Ingestion sources overview.

This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. For more information, see the following:

  • Create a table
  • Ingest data from Confluent Cloud

Inline schema

This section walks you through creating an ingestion job from a Confluent Cloud connection while specifying the input schema inline in the job request. Each ingestion job is treated as a separate standalone example.

Avro

Submit a POST request to the /v2/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Job v2 API documentation for a full description of required parameters.

To ingest streaming Avro data using an inline schema specification, define the following fields within formatSettings:

  • format: Set to avro_stream.
  • parseSchemaProvider:
    • type: Set to inline-avro.
    • schema: Pass the JSON object representing the Avro schema for the input data.

The input data in this example is defined by the following Avro schema:

{
  "type": "record",
  "namespace": "io.imply",
  "name": "KttmEvent",
  "fields": [
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "city",
      "type": "string"
    },
    {
      "name": "session",
      "type": "string"
    },
    {
      "name": "session_length",
      "type": "long"
    }
  ]
}

For more information on the Avro data serialization system, see the Apache Avro documentation.

Sample request

The following example request creates an ingestion job that reads streaming Avro data using an inline schema:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response

{
    "type": "streaming",
    "id": "ec5c7e91-3a23-4d92-b122-d2de3ab86db8",
    "target": {
        "type": "table",
        "tableName": "koalas_inline_avro"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2022-11-10T20:51:50.753651533Z",
    "lastUpdatedTimestamp": "2022-11-10T20:51:50.753651533Z",
    "source": {
        "type": "connection",
        "connectionName": "confluent_inline_avro",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "city"
            },
            {
                "dataType": "string",
                "name": "session"
            },
            {
                "dataType": "float",
                "name": "session_length"
            }
        ],
        "formatSettings": {
            "format": "avro_stream",
            "parseSchemaProvider": {
                "type": "inline-avro",
                "schema": {
                    "type": "record",
                    "namespace": "io.imply",
                    "name": "KttmEvent",
                    "fields": [
                        {
                            "name": "timestamp",
                            "type": "string"
                        },
                        {
                            "name": "city",
                            "type": "string"
                        },
                        {
                            "name": "session",
                            "type": "string"
                        },
                        {
                            "name": "session_length",
                            "type": "long"
                        }
                    ]
                }
            },
            "binaryAsString": false,
            "extractUnionsByType": false
        }
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        },
        {
            "columnName": "city",
            "expression": "\"city\""
        },
        {
            "columnName": "session",
            "expression": "\"session\""
        },
        {
            "columnName": "max_session_length",
            "expression": "MAX(\"session_length\")"
        },
        {
            "columnName": "__count",
            "expression": "COUNT(*)"
        }
    ],
    "readFromPoint": "latest"
}

Protobuf

Submit a POST request to the /v2/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Job v2 API documentation for a full description of required parameters.

To ingest Protobuf data using an inline schema specification, define the following fields within formatSettings:

  • format: Set to protobuf.
  • parseSchemaProvider:
    • type: Set to inline-protobuf.
    • descriptor: Pass a Base64-encoded string of the compiled Protobuf descriptor.

The input data in this example is defined by the following Protocol format in kttm.proto:

syntax = "proto3";

message SomeMessage {
    string timestamp = 1;
    string city = 2;
    string session = 3;
    uint32 session_length = 4;
}

For a message schema defined in a .proto file, you use protoc to compile the descriptor. For example:

protoc --include_imports --descriptor_set_out=kttm.desc kttm.proto

For more information on Protobuf, see the Google documentation.

Sample request

The following example request creates an ingestion job that reads streaming Protobuf data using an inline schema:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response

{
    "type": "streaming",
    "id": "dd1041e1-52d5-416d-9fb3-907613a0a678",
    "target": {
        "type": "table",
        "tableName": "koalas_inline_protobuf"
    },
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2022-11-10T21:19:51.20383234Z",
    "lastUpdatedTimestamp": "2022-11-10T21:19:51.20383234Z",
    "source": {
        "type": "connection",
        "connectionName": "confluent_inline_protobuf",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "city"
            },
            {
                "dataType": "string",
                "name": "session"
            },
            {
                "dataType": "float",
                "name": "session_length"
            }
        ],
        "formatSettings": {
            "format": "protobuf",
            "parseSchemaProvider": {
                "type": "inline-protobuf",
                "descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
            }
        }
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        },
        {
            "columnName": "city",
            "expression": "\"city\""
        },
        {
            "columnName": "session",
            "expression": "\"session\""
        },
        {
            "columnName": "max_session_length",
            "expression": "MAX(\"session_length\")"
        },
        {
            "columnName": "__count",
            "expression": "COUNT(*)"
        }
    ],
    "readFromPoint": "latest"
}

Avro OCF

Avro Object Container File a file format that you can use in batch ingestion jobs. Ingesting data in Avro OCF format is similar to that of Avro format. To ingest Avro OCF data, define the following fields within formatSettings:

  • format: Set to avro_ocf.
  • schema: Optionally pass the JSON object representing the Avro schema for the input data. For Avro OCF, the file itself contains the schema used to generate the file. You only need schema if you want to override the schema in the file.

Ensure the job type is batch and that you define the job source as uploaded with the correct filenames in fileList.

See the Job v2 API documentation for a full description of required parameters.

This example uses the Avro OCF file kttm-avro.ocf. Be sure to upload the file to the Polaris staging area before starting the ingestion job.

Sample request

The following example request creates an ingestion job that reads an Avro OCF file named kttm-avro.ocf:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response

{
    "type": "batch",
    "target": {
        "tableName": "koalas_avro_ocf",
        "intervals": [],
        "type": "table"
    },
    "source": {
        "fileList": [
            "kttm-avro.ocf"
        ],
        "formatSettings": {
            "binaryAsString": false,
            "extractUnionsByType": false,
            "schema": {},
            "format": "avro_ocf"
        },
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "city"
            },
            {
                "dataType": "string",
                "name": "session"
            },
            {
                "dataType": "float",
                "name": "session_length"
            }
        ],
        "type": "uploaded"
    },
    "ingestionMode": "append",
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        },
        {
            "columnName": "city",
            "expression": "\"city\""
        },
        {
            "columnName": "session",
            "expression": "\"session\""
        },
        {
            "columnName": "max_session_length",
            "expression": "MAX(\"session_length\")"
        },
        {
            "columnName": "__count",
            "expression": "COUNT(*)"
        }
    ],
    "filterExpression": null,
    "id": "d3f03fbd-0aeb-4a6b-b856-fd1d6cf7fca1",
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2023-01-30T23:44:36.366926098Z",
    "lastUpdatedTimestamp": "2023-01-30T23:44:36.366926098Z",
    "startedTimestamp": null,
    "completedTimestamp": null
}

Confluent Schema Registry

Before connecting to your Confluent Schema Registry, ensure that Schema Registry is enabled for your Confluent Cloud cluster, you have an API key specific to Schema Registry, and you know the endpoint for your Schema Registry server. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.

The following diagram shows an example workflow for ingesting data from a Kafka topic in Confluent Cloud that references a schema from Confluent Schema Registry. The steps in yellow signify actions that occur outside of Polaris. Arrows signify references; for example, the ingestion job references the table, Confluent Cloud connection, and Confluent Schema Registry connection.

Schema registry workflow

This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. This section provides more information on steps 6-8 of the example workflow.

Create a connection to Confluent Schema Registry

To create a connection to Confluent Schema Registry, send a POST request with a request payload with the following fields:

  • type: Set to confluent_schema_registry.
  • name: Provide a unique name for the connection.
  • description: Optionally describe the connection.
  • urls: Provide one or more Schema Registry endpoints.

Send a POST request to the /v2/connections endpoint to create a connection.

See the Connections v2 API documentation for a description of required parameters.

Sample request

The following example request creates a connection named example_schema to Confluent Schema Registry:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
    "type": "confluent_schema_registry",
    "name": "example_schema",
    "submittedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "submittedOnTimestamp": "2022-12-13T22:20:54Z",
    "modifiedByUser": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "modifiedOnTimestamp": "2022-12-13T22:20:54Z",
    "urls": [
        "https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
    ]
}

Authenticate to the Schema Registry

Add your authentication details to the Schema Registry connection by sending a PUT request to the /v2/connections/{name}/secrets endpoint. Replace {name} in the path with the name of your connection in Polaris.

The payload for the request depends on the authentication mode configured for your Confluent Schema Registry server. Confluent Schema Registry supports multiple authentication modes, including HTTP Basic authentication or SASL authentication.

This example uses Basic authentication in which the username is the Confluent Schema Registry API key, and the password is the API secret. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.

See the Connections v2 API documentation for a description of required parameters.

Sample request

The following example adds the username and password to a Confluent Schema Registry connection using Basic authentication:

cURL
Python
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/example_schema/secrets' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "basic",
"username": "XXXXXXXXXXXXXXXX",
"password": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/example_schema/secrets"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "basic",
"username": "XXXXXXXXXXXXXXXX",
"password": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("PUT", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 200 OK response.

Start ingestion job

Submit a POST request to the /v2/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Job v2 API documentation for a full description of required parameters.

To ingest streaming data associated with schema from Confluent Schema Registry, define the following fields within formatSettings:

  • format: Set to avro_stream or protobuf.
  • parseSchemaProvider:
    • type: Set to connection.
    • connectionName: Pass the name of the Schema Registry connection.

Sample request

The following example request creates an ingestion job that reads streaming Avro data using a schema from Confluent Schema Registry:

cURL
Python
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'

import os
import requests
import json

url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"

apikey = os.getenv("POLARIS_API_KEY")

payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response

{
    "type": "streaming",
    "target": {
        "tableName": "koalas_sr_avro",
        "intervals": [],
        "type": "table"
    },
    "source": {
        "connectionName": "confluent_sr_avro",
        "inputSchema": [
            {
                "dataType": "string",
                "name": "timestamp"
            },
            {
                "dataType": "string",
                "name": "city"
            },
            {
                "dataType": "string",
                "name": "session"
            },
            {
                "dataType": "float",
                "name": "session_length"
            }
        ],
        "formatSettings": {
            "parseSchemaProvider": {
                "connectionName": "example_schema",
                "type": "connection"
            },
            "format": "avro_stream"
        },
        "type": "connection"
    },
    "mappings": [
        {
            "columnName": "__time",
            "expression": "TIME_PARSE(\"timestamp\")"
        },
        {
            "columnName": "city",
            "expression": "\"city\""
        },
        {
            "columnName": "session",
            "expression": "\"session\""
        },
        {
            "columnName": "max_session_length",
            "expression": "MAX(\"session_length\")"
        },
        {
            "columnName": "__count",
            "expression": "COUNT(*)"
        }
    ],
    "filterExpression": null,
    "lateMessageRejectionPeriod": "P30D",
    "readFromPoint": "latest",
    "id": "b5b913c0-5608-48f2-9f2d-4fcc10e73d40",
    "desiredExecutionStatus": "running",
    "createdBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "lastModifiedBy": {
        "username": "api-key-pok_vipgj...bjjvyo",
        "userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
    },
    "executionStatus": "pending",
    "health": {
        "status": "ok"
    },
    "createdTimestamp": "2023-01-26T19:59:37.781867043Z",
    "lastUpdatedTimestamp": "2023-01-26T19:59:37.781867043Z",
    "startedTimestamp": null,
    "completedTimestamp": null
}

← Ingest and query sketchesQuery data →
  • Prerequisites
  • Inline schema
    • Avro
    • Protobuf
    • Avro OCF
  • Confluent Schema Registry
    • Create a connection to Confluent Schema Registry
    • Authenticate to the Schema Registry
    • Start ingestion job
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2023 Imply Data, Inc