Specify data schema by API
When you create an ingestion job to load data into a table in Imply Polaris, you can define the schema of the input data and map the input fields to the columns of the Polaris table schema. The following data formats require additional schema information to describe the data:
Note that Polaris doesn't support Avro and Protobuf formats for push streaming ingestion.
To ingest data from one of these formats, provide the schema for the data in your ingestion job request in source.formatSettings.parseSchemaProvider
.
Choose one of the following options to provide your schema, depending on how you publish your events in Kafka:
Inline schema specification: Set the schema definition directly within the payload of the ingestion job request. Choose inline schema when your message wasn't published with a schema registry.
Confluent Schema Registry: Create a connection to your Schema Registry, and reference the connection in your ingestion job spec. Choose this option when your message was published using a schema registry.
If you produced Kafka records using Schema Registry, you must also configure Schema Registry in your Polaris ingestion. You cannot use an inline schema specification to read records that contain both a schema ID and the serialized data.
This topic walks you through how to provide the schema of your input data, directly within the job request as well as using Confluent Schema Registry.
To use the UI, see Specify the data schema.
Prerequisites
This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. For more information, see the following:
To follow along in this topic, you need the following:
An API key with the
ManageIngestionJobs
role to create and manage ingestion jobs. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on roles and their permissions, visit User roles reference.A connection to a streaming source. This topic uses example Confluent Cloud connections starting with
confluent_
. Confluent Cloud is used to illustrate the examples in this topic although you can use any supported streaming source. For a list of the sources that Polaris supports for data ingestion, see Ingestion sources overview.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Use a different table name for each ingestion job example in this topic.
Inline schema
This section walks you through creating an ingestion job from a Confluent Cloud connection while specifying the input schema inline in the job request. Each ingestion job is treated as a separate standalone example.
Avro
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a streaming ingestion job.
In the request payload, you set the data format settings within source.formatSettings
.
See the Jobs v1 API documentation for a full description of required parameters.
To ingest streaming Avro data using an inline schema specification, define the following fields within formatSettings
:
format
: Set toavro_stream
.parseSchemaProvider
:type
: Set toinline-avro
.schema
: Pass the JSON object representing the Avro schema for the input data.
The input data in this example is defined by the following Avro schema:
{
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
For more information on the Avro data serialization system, see the Apache Avro documentation.
Sample request
The following example request creates an ingestion job that reads streaming Avro data using an inline schema:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"id": "ec5c7e91-3a23-4d92-b122-d2de3ab86db8",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T20:51:50.753651533Z",
"lastUpdatedTimestamp": "2022-11-10T20:51:50.753651533Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
},
"binaryAsString": false,
"extractUnionsByType": false
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}
Protobuf
The example in this section uses the following Protobuf message schema saved in a file named kttm.proto
:
syntax = "proto3";
message SomeMessage {
string timestamp = 1;
string city = 2;
string session = 3;
uint32 session_length = 4;
}
You can use protoc
to compile the Protobuf schema into a descriptor file. For example:
protoc --include_imports --descriptor_set_out=kttm.desc kttm.proto
To ingest Protobuf data using an inline schema, pass the Base64-encoded content of the compiled Protobuf descriptor (kttm.desc
in the example) in the ingestion job spec.
In the request payload, describe the data format settings in source.formatSettings
:
format
: Set toprotobuf
.parseSchemaProvider
:type
: Set toinline-protobuf
.descriptor
: Base64-encoded string of the compiled Protobuf descriptor.
protoMessageType
: Optional. Name of the Protobuf message type. If you don't specify a message type, the parser uses the first message type found in the descriptor.Polaris accepts either a short name or a fully qualified name for the message type. For example, in the following
.proto
snippet, bothPerson
andtutorial.Person
are valid message types.package tutorial;
message Person {
...
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create the streaming ingestion job.
See the Jobs v1 API documentation for a full description of required parameters.
For more information on Protobuf, see the Google documentation.
Sample request
The following example request creates an ingestion job that reads streaming Protobuf data using an inline schema:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"id": "dd1041e1-52d5-416d-9fb3-907613a0a678",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T21:19:51.20383234Z",
"lastUpdatedTimestamp": "2022-11-10T21:19:51.20383234Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}
Avro OCF
Avro Object Container File a file format that you can use in batch ingestion jobs.
Ingesting data in Avro OCF format is similar to that of Avro format.
To ingest Avro OCF data, define the following fields within formatSettings
:
format
: Set toavro_ocf
.schema
: Optionally pass the JSON object representing the Avro schema for the input data. For Avro OCF, the file itself contains the schema used to generate the file. You only needschema
if you want to override the schema in the file.
Ensure the job type is batch
and that you define the job source as uploaded
with the correct filenames in fileList
.
See the Jobs v1 API documentation for a full description of required parameters.
This example uses the Avro OCF file kttm-avro.ocf
.
Be sure to upload the file to the Polaris staging area before starting the ingestion job.
Sample request
The following example request creates an ingestion job that reads an Avro OCF file named kttm-avro.ocf
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"createTableIfNotExists": true,
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"createTableIfNotExists": True,
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "batch",
"target": {
"tableName": "koalas_avro_ocf",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"source": {
"fileList": [
"kttm-avro.ocf"
],
"formatSettings": {
"binaryAsString": false,
"extractUnionsByType": false,
"schema": {},
"format": "avro_ocf"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"type": "uploaded"
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"id": "d3f03fbd-0aeb-4a6b-b856-fd1d6cf7fca1",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-30T23:44:36.366926098Z",
"lastUpdatedTimestamp": "2023-01-30T23:44:36.366926098Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Confluent Schema Registry
Before connecting to your Confluent Schema Registry, ensure that Schema Registry is enabled for your Confluent Cloud cluster, you have an API key specific to Schema Registry, and you know the endpoint for your Schema Registry server. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.
The following diagram shows an example workflow for ingesting data from a Kafka topic in Confluent Cloud that references a schema from Confluent Schema Registry. The steps in yellow signify actions that occur outside of Polaris. Arrows signify references; for example, the ingestion job references the table, Confluent Cloud connection, and Confluent Schema Registry connection.
This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. This section provides more information on steps 6-8 of the example workflow.
Create a connection to Confluent Schema Registry
To create a connection to Confluent Schema Registry, send a POST
request with a request payload with the following fields:
type
: Set toconfluent_schema_registry
.name
: Provide a unique name for the connection.description
: Optionally describe the connection.urls
: Provide one or more Schema Registry endpoints.secrets
: Add your authentication details using Basic authentication with your Confluent Schema Registry API key.The secrets username is the Confluent Schema Registry API key, and the secrets password is the API secret. For information on creating API keys in Confluent Schema Registry, refer to the Confluent documentation.
Send a POST
request to the /v1/projects/PROJECT_ID/connections
endpoint to create a connection.
See the Connections API documentation for a description of required parameters.
Sample request
The following example request creates a connection named example_schema
to Confluent Schema Registry.
This example uses the environment variables $CSR_USERNAME
and $CSR_PASSWORD
to store the Confluent Schema Registry API key and API secret, respectively.
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"secrets": {
"type": "basic",
"username": "$CSR_USERNAME",
"password": "$CSR_PASSWORD"
}
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections"
apikey = os.getenv("POLARIS_API_KEY")
csr_username = os.getenv("CSR_USERNAME")
csr_password = os.getenv("CSR_PASSWORD")
payload = json.dumps({
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"secrets": {
"type": "basic",
"username": csr_username,
"password": csr_password
}
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"secrets": {
"username": "$CSR_USERNAME",
"type": "basic"
},
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
],
"type": "confluent_schema_registry",
"name": "example_schema",
"description": null,
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"submittedOnTimestamp": "2023-03-28T17:46:07.154905Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"modifiedOnTimestamp": "2023-03-28T17:46:07.154905Z"
}
Start ingestion job
Submit a POST
request to the /v1/projects/PROJECT_ID/jobs
endpoint to create a streaming ingestion job.
In the request payload, you set the data format settings within source.formatSettings
.
See the Jobs v1 API documentation for a full description of required parameters.
To ingest streaming data associated with schema from Confluent Schema Registry, define the following fields within formatSettings
:
format
: Set toavro_stream
orprotobuf
.parseSchemaProvider
:type
: Set toconnection
.connectionName
: Pass the name of the Schema Registry connection.
Sample request
The following example request creates an ingestion job that reads streaming Avro data using a schema from Confluent Schema Registry:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": True
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"target": {
"tableName": "koalas_sr_avro",
"intervals": [],
"type": "table"
},
"createTableIfNotExists": true,
"source": {
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"parseSchemaProvider": {
"connectionName": "example_schema",
"type": "connection"
},
"format": "avro_stream"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")",
"isAggregation": true
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"id": "b5b913c0-5608-48f2-9f2d-4fcc10e73d40",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-xxxxxxxxxxxx"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-26T19:59:37.781867043Z",
"lastUpdatedTimestamp": "2023-01-26T19:59:37.781867043Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Learn more
See the following topics for more information: