Skip to main content

Specify data schema by API

info

Project-less regional API resources have been deprecated and will be removed by the end of September 2024.

You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023. For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID

Projects created before September 29, 2023 can continue to use project-less URLs until the end of September 2024. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.

When you create an ingestion job to load data into a table in Imply Polaris, you can define the schema of the input data and map the input fields to the columns of the Polaris table schema. The following data formats require additional schema information to describe the data:

info

Avro and Protobuf formats are not supported for push streaming ingestion.

To ingest data from one of these formats, provide the schema for the data in your ingestion job request in source.formatSettings.parseSchemaProvider. The data schema can come from one of the following sources:

  • An inline specification in which you set the schema definition directly within the payload of the ingestion job request.
  • Confluent Schema Registry, a storage layer for storing, organizing, and retrieving schemas.
info

If you produce Kafka records using Schema Registry, you must also configure Schema Registry in your Polaris ingestion. You cannot use an inline schema specification to read records that contain both a schema ID and the serialized data.

This topic walks you through how to provide the schema of your input data, directly within the job request as well as using Confluent Schema Registry.

Prerequisites

This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. For more information, see the following:

To follow along in this topic, you need the following:

  • An API key with the ManageIngestionJobs role to create and manage ingestion jobs. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on roles and their permissions, visit User roles reference.

  • A connection to a streaming source. This topic uses example Confluent Cloud connections starting with confluent_. Confluent Cloud is used to illustrate the examples in this topic although you can use any supported streaming source. For a list of the sources that Polaris supports for data ingestion, see Ingestion sources overview.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables. Use a different table name for each ingestion job example in this topic.

Inline schema

This section walks you through creating an ingestion job from a Confluent Cloud connection while specifying the input schema inline in the job request. Each ingestion job is treated as a separate standalone example.

Avro

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Jobs v1 API documentation for a full description of required parameters.

To ingest streaming Avro data using an inline schema specification, define the following fields within formatSettings:

  • format: Set to avro_stream.
  • parseSchemaProvider:
    • type: Set to inline-avro.
    • schema: Pass the JSON object representing the Avro schema for the input data.

The input data in this example is defined by the following Avro schema:

{
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}

For more information on the Avro data serialization system, see the Apache Avro documentation.

Sample request

The following example request creates an ingestion job that reads streaming Avro data using an inline schema:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "streaming",
"id": "ec5c7e91-3a23-4d92-b122-d2de3ab86db8",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T20:51:50.753651533Z",
"lastUpdatedTimestamp": "2022-11-10T20:51:50.753651533Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
},
"binaryAsString": false,
"extractUnionsByType": false
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}

Protobuf

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Jobs v1 API documentation for a full description of required parameters.

To ingest Protobuf data using an inline schema specification, define the following fields within formatSettings:

  • format: Set to protobuf.
  • parseSchemaProvider:
    • type: Set to inline-protobuf.
    • descriptor: Pass a Base64-encoded string of the compiled Protobuf descriptor.

The input data in this example is defined by the following Protocol format in kttm.proto:

syntax = "proto3";

message SomeMessage {
string timestamp = 1;
string city = 2;
string session = 3;
uint32 session_length = 4;
}

For a message schema defined in a .proto file, you use protoc to compile the descriptor. For example:

protoc --include_imports --descriptor_set_out=kttm.desc kttm.proto

For more information on Protobuf, see the Google documentation.

Sample request

The following example request creates an ingestion job that reads streaming Protobuf data using an inline schema:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "streaming",
"id": "dd1041e1-52d5-416d-9fb3-907613a0a678",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T21:19:51.20383234Z",
"lastUpdatedTimestamp": "2022-11-10T21:19:51.20383234Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}

Avro OCF

Avro Object Container File a file format that you can use in batch ingestion jobs. Ingesting data in Avro OCF format is similar to that of Avro format. To ingest Avro OCF data, define the following fields within formatSettings:

  • format: Set to avro_ocf.
  • schema: Optionally pass the JSON object representing the Avro schema for the input data. For Avro OCF, the file itself contains the schema used to generate the file. You only need schema if you want to override the schema in the file.

Ensure the job type is batch and that you define the job source as uploaded with the correct filenames in fileList.

See the Jobs v1 API documentation for a full description of required parameters.

This example uses the Avro OCF file kttm-avro.ocf. Be sure to upload the file to the Polaris staging area before starting the ingestion job.

Sample request

The following example request creates an ingestion job that reads an Avro OCF file named kttm-avro.ocf:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "batch",
"target": {
"tableName": "koalas_avro_ocf",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"source": {
"fileList": [
"kttm-avro.ocf"
],
"formatSettings": {
"binaryAsString": false,
"extractUnionsByType": false,
"schema": {},
"format": "avro_ocf"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"type": "uploaded"
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"id": "d3f03fbd-0aeb-4a6b-b856-fd1d6cf7fca1",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-30T23:44:36.366926098Z",
"lastUpdatedTimestamp": "2023-01-30T23:44:36.366926098Z",
"startedTimestamp": null,
"completedTimestamp": null
}

Confluent Schema Registry

Before connecting to your Confluent Schema Registry, ensure that Schema Registry is enabled for your Confluent Cloud cluster, you have an API key specific to Schema Registry, and you know the endpoint for your Schema Registry server. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.

The following diagram shows an example workflow for ingesting data from a Kafka topic in Confluent Cloud that references a schema from Confluent Schema Registry. The steps in yellow signify actions that occur outside of Polaris. Arrows signify references; for example, the ingestion job references the table, Confluent Cloud connection, and Confluent Schema Registry connection.

Schema registry workflow

This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. This section provides more information on steps 6-8 of the example workflow.

Create a connection to Confluent Schema Registry

To create a connection to Confluent Schema Registry, send a POST request with a request payload with the following fields:

  • type: Set to confluent_schema_registry.
  • name: Provide a unique name for the connection.
  • description: Optionally describe the connection.
  • urls: Provide one or more Schema Registry endpoints.
  • secrets: Add your authentication details to Confluent Schema Registry.

The value for secrets.type depends on the authentication mode configured for your Confluent Schema Registry server.

Send a POST request to the /v1/projects/PROJECT_ID/connections endpoint to create a connection. See the Connections API documentation for a description of required parameters.

Sample request

The following example request creates a connection named example_schema to Confluent Schema Registry.

This example uses Basic authentication in which the username is the Confluent Schema Registry API key, and the password is the API secret. The environment variables $CSR_USERNAME and $CSR_PASSWORD store the Confluent Schema Registry API key and API secret, respectively. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"secrets": {
"type": "basic",
"username": "$CSR_USERNAME",
"password": "$CSR_PASSWORD"
}
}'

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
"secrets": {
"username": "$CSR_USERNAME",
"type": "basic"
},
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"type": "confluent_schema_registry",
"name": "example_schema",
"description": null,
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2023-03-28T17:46:07.154905Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2023-03-28T17:46:07.154905Z"
}

Start ingestion job

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Jobs v1 API documentation for a full description of required parameters.

To ingest streaming data associated with schema from Confluent Schema Registry, define the following fields within formatSettings:

  • format: Set to avro_stream or protobuf.
  • parseSchemaProvider:
    • type: Set to connection.
    • connectionName: Pass the name of the Schema Registry connection.

Sample request

The following example request creates an ingestion job that reads streaming Avro data using a schema from Confluent Schema Registry:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "streaming",
"target": {
"tableName": "koalas_sr_avro",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"source": {
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"parseSchemaProvider": {
"connectionName": "example_schema",
"type": "connection"
},
"format": "avro_stream"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"id": "b5b913c0-5608-48f2-9f2d-4fcc10e73d40",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-26T19:59:37.781867043Z",
"lastUpdatedTimestamp": "2023-01-26T19:59:37.781867043Z",
"startedTimestamp": null,
"completedTimestamp": null
}

Learn more

See the following topics for more information: