Skip to main content

Specify data schema by API

When you create an ingestion job to load data into a table in Imply Polaris, you can define the schema of the input data and map the input fields to the columns of the Polaris table schema. The following data formats require additional schema information to describe the data:

Note that Polaris doesn't support Avro and Protobuf formats for push streaming ingestion.

To ingest data from one of these formats, provide the schema for the data in your ingestion job request in source.formatSettings.parseSchemaProvider. Choose one of the following options to provide your schema, depending on how you publish your events in Kafka:

  • Inline schema specification: Set the schema definition directly within the payload of the ingestion job request. Choose inline schema when your message wasn't published with a schema registry.

  • Confluent Schema Registry: Create a connection to your Schema Registry, and reference the connection in your ingestion job spec. Choose this option when your message was published using a schema registry.

tip

If you produced Kafka records using Schema Registry, you must also configure Schema Registry in your Polaris ingestion. You cannot use an inline schema specification to read records that contain both a schema ID and the serialized data.

This topic walks you through how to provide the schema of your input data, directly within the job request as well as using Confluent Schema Registry.

To use the UI, see Specify the data schema.

Prerequisites

This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. For more information, see the following:

To follow along in this topic, you need the following:

  • An API key with the ManageIngestionJobs role to create and manage ingestion jobs. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on roles and their permissions, visit User roles reference.

  • A connection to a streaming source. This topic uses example Confluent Cloud connections starting with confluent_. Confluent Cloud is used to illustrate the examples in this topic although you can use any supported streaming source. For a list of the sources that Polaris supports for data ingestion, see Ingestion sources overview.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables. Use a different table name for each ingestion job example in this topic.

Inline schema

This section walks you through creating an ingestion job from a Confluent Cloud connection while specifying the input schema inline in the job request. Each ingestion job is treated as a separate standalone example.

Avro

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Jobs v1 API documentation for a full description of required parameters.

To ingest streaming Avro data using an inline schema specification, define the following fields within formatSettings:

  • format: Set to avro_stream.
  • parseSchemaProvider:
    • type: Set to inline-avro.
    • schema: Pass the JSON object representing the Avro schema for the input data.

The input data in this example is defined by the following Avro schema:

{
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}

For more information on the Avro data serialization system, see the Apache Avro documentation.

Sample request

The following example request creates an ingestion job that reads streaming Avro data using an inline schema:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "streaming",
"id": "ec5c7e91-3a23-4d92-b122-d2de3ab86db8",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T20:51:50.753651533Z",
"lastUpdatedTimestamp": "2022-11-10T20:51:50.753651533Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
},
"binaryAsString": false,
"extractUnionsByType": false
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}

Protobuf

The example in this section uses the following Protobuf message schema saved in a file named kttm.proto:

syntax = "proto3";

message SomeMessage {
string timestamp = 1;
string city = 2;
string session = 3;
uint32 session_length = 4;
}

You can use protoc to compile the Protobuf schema into a descriptor file. For example:

protoc --include_imports --descriptor_set_out=kttm.desc kttm.proto

To ingest Protobuf data using an inline schema, pass the Base64-encoded content of the compiled Protobuf descriptor (kttm.desc in the example) in the ingestion job spec.

In the request payload, describe the data format settings in source.formatSettings:

  • format: Set to protobuf.

  • parseSchemaProvider:

    • type: Set to inline-protobuf.
    • descriptor: Base64-encoded string of the compiled Protobuf descriptor.
  • protoMessageType: Optional. Name of the Protobuf message type. If you don't specify a message type, the parser uses the first message type found in the descriptor.

    Polaris accepts either a short name or a fully qualified name for the message type. For example, in the following .proto snippet, both Person and tutorial.Person are valid message types.

    package tutorial;
    message Person {
    ...

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create the streaming ingestion job. See the Jobs v1 API documentation for a full description of required parameters.

For more information on Protobuf, see the Google documentation.

Sample request

The following example request creates an ingestion job that reads streaming Protobuf data using an inline schema:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "streaming",
"id": "dd1041e1-52d5-416d-9fb3-907613a0a678",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T21:19:51.20383234Z",
"lastUpdatedTimestamp": "2022-11-10T21:19:51.20383234Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}

Avro OCF

Avro Object Container File a file format that you can use in batch ingestion jobs. Ingesting data in Avro OCF format is similar to that of Avro format. To ingest Avro OCF data, define the following fields within formatSettings:

  • format: Set to avro_ocf.
  • schema: Optionally pass the JSON object representing the Avro schema for the input data. For Avro OCF, the file itself contains the schema used to generate the file. You only need schema if you want to override the schema in the file.

Ensure the job type is batch and that you define the job source as uploaded with the correct filenames in fileList.

See the Jobs v1 API documentation for a full description of required parameters.

This example uses the Avro OCF file kttm-avro.ocf. Be sure to upload the file to the Polaris staging area before starting the ingestion job.

Sample request

The following example request creates an ingestion job that reads an Avro OCF file named kttm-avro.ocf:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "batch",
"target": {
"tableName": "koalas_avro_ocf",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"source": {
"fileList": [
"kttm-avro.ocf"
],
"formatSettings": {
"binaryAsString": false,
"extractUnionsByType": false,
"schema": {},
"format": "avro_ocf"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"type": "uploaded"
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"id": "d3f03fbd-0aeb-4a6b-b856-fd1d6cf7fca1",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-30T23:44:36.366926098Z",
"lastUpdatedTimestamp": "2023-01-30T23:44:36.366926098Z",
"startedTimestamp": null,
"completedTimestamp": null
}

Confluent Schema Registry

Before connecting to your Confluent Schema Registry, ensure that Schema Registry is enabled for your Confluent Cloud cluster, you have an API key specific to Schema Registry, and you know the endpoint for your Schema Registry server. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.

The following diagram shows an example workflow for ingesting data from a Kafka topic in Confluent Cloud that references a schema from Confluent Schema Registry. The steps in yellow signify actions that occur outside of Polaris. Arrows signify references; for example, the ingestion job references the table, Confluent Cloud connection, and Confluent Schema Registry connection.

Schema registry workflow

This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. This section provides more information on steps 6-8 of the example workflow.

Create a connection to Confluent Schema Registry

To create a connection to Confluent Schema Registry, send a POST request with a request payload with the following fields:

  • type: Set to confluent_schema_registry.

  • name: Provide a unique name for the connection.

  • description: Optionally describe the connection.

  • urls: Provide one or more Schema Registry endpoints.

  • secrets: Add your authentication details using Basic authentication with your Confluent Schema Registry API key.

    The secrets username is the Confluent Schema Registry API key, and the secrets password is the API secret. For information on creating API keys in Confluent Schema Registry, refer to the Confluent documentation.

Send a POST request to the /v1/projects/PROJECT_ID/connections endpoint to create a connection. See the Connections API documentation for a description of required parameters.

Sample request

The following example request creates a connection named example_schema to Confluent Schema Registry.

This example uses the environment variables $CSR_USERNAME and $CSR_PASSWORD to store the Confluent Schema Registry API key and API secret, respectively.

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"secrets": {
"type": "basic",
"username": "$CSR_USERNAME",
"password": "$CSR_PASSWORD"
}
}'

Sample response

A successful request returns a 200 OK response and the details of the successful connection, for example:

{
"secrets": {
"username": "$CSR_USERNAME",
"type": "basic"
},
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"type": "confluent_schema_registry",
"name": "example_schema",
"description": null,
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2023-03-28T17:46:07.154905Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2023-03-28T17:46:07.154905Z"
}

Start ingestion job

Submit a POST request to the /v1/projects/PROJECT_ID/jobs endpoint to create a streaming ingestion job. In the request payload, you set the data format settings within source.formatSettings. See the Jobs v1 API documentation for a full description of required parameters.

To ingest streaming data associated with schema from Confluent Schema Registry, define the following fields within formatSettings:

  • format: Set to avro_stream or protobuf.
  • parseSchemaProvider:
    • type: Set to connection.
    • connectionName: Pass the name of the Schema Registry connection.

Sample request

The following example request creates an ingestion job that reads streaming Avro data using a schema from Confluent Schema Registry:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'

Sample response

A successful request returns a 201 Created response and the details of the ingestion job:

Click to view the response
{
"type": "streaming",
"target": {
"tableName": "koalas_sr_avro",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"source": {
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"parseSchemaProvider": {
"connectionName": "example_schema",
"type": "connection"
},
"format": "avro_stream"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"id": "b5b913c0-5608-48f2-9f2d-4fcc10e73d40",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-26T19:59:37.781867043Z",
"lastUpdatedTimestamp": "2023-01-26T19:59:37.781867043Z",
"startedTimestamp": null,
"completedTimestamp": null
}

Learn more

See the following topics for more information: