Specify data schema by API
When you create an ingestion job to load data into a table in Imply Polaris,
you define the schema of the input data and map the input fields to the columns of the Polaris table schema.
Regardless of your source data format, all ingestion jobs require an input schema
defined in the source.inputSchema
field of the job specification, such as the following:
"source": {
"type": "uploaded",
"fileList": [
"kttm-2019-08-20.json.gz"
],
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "long",
"name": "session_length"
}
],
...
},
The following data formats require additional schema definition to describe the data:
Avro and Protobuf formats are not supported for push streaming ingestion.
To ingest data from one of these formats, you not only specify the input schema in source.inputSchema
but also provide the schema for the data in your ingestion job request in source.formatSettings.parseSchemaProvider
.
You provide the data schema using one of the following sources:
- An inline specification in which you set the schema definition directly within the payload of the ingestion job request.
- Confluent Schema Registry, a storage layer for storing, organizing, and retrieving schemas.
If you produce Kafka records using Schema Registry, you must also configure Schema Registry in your Polaris ingestion. You cannot use an inline schema specification to read records that contain both a schema ID and the serialized data.
This topic walks you through how to provide the schema of your input data, directly within the job request as well as using Confluent Schema Registry.
Prerequisites
To follow along in this topic, you need the following:
An API key with the
ManageIngestionJobs
role to create and manage ingestion jobs. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. See API key authentication to obtain an API key and assign permissions. Visit User roles reference for more information on roles and their permissions.A table with a defined schema. This topic uses the example table from Create a table with a schema. Create a separate table for each ingestion job example in this topic. The example table contains the following schema:
__time
: the primary timestampcity
: a string dimensionsession
: a string dimensionmax_session_length
: a long measure
A connection to a streaming source. This topic uses example Confluent Cloud connections starting with
confluent_
. Confluent Cloud is used to illustrate the examples in this topic although you can use any supported streaming source. For a list of the sources that Polaris supports for data ingestion, see Ingestion sources overview.
This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. For more information, see the following:
Inline schema
This section walks you through creating an ingestion job from a Confluent Cloud connection while specifying the input schema inline in the job request. Each ingestion job is treated as a separate standalone example.
Avro
Submit a POST
request to the /v2/jobs
endpoint to create a streaming ingestion job.
In the request payload, you set the data format settings within source.formatSettings
.
See the Job v2 API documentation for a full description of required parameters.
To ingest streaming Avro data using an inline schema specification, define the following fields within formatSettings
:
format
: Set toavro_stream
.parseSchemaProvider
:type
: Set toinline-avro
.schema
: Pass the JSON object representing the Avro schema for the input data.
The input data in this example is defined by the following Avro schema:
{
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
For more information on the Avro data serialization system, see the Apache Avro documentation.
Sample request
The following example request creates an ingestion job that reads streaming Avro data using an inline schema:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"id": "ec5c7e91-3a23-4d92-b122-d2de3ab86db8",
"target": {
"type": "table",
"tableName": "koalas_inline_avro"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T20:51:50.753651533Z",
"lastUpdatedTimestamp": "2022-11-10T20:51:50.753651533Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "inline-avro",
"schema": {
"type": "record",
"namespace": "io.imply",
"name": "KttmEvent",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "session",
"type": "string"
},
{
"name": "session_length",
"type": "long"
}
]
}
},
"binaryAsString": false,
"extractUnionsByType": false
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}
Protobuf
Submit a POST
request to the /v2/jobs
endpoint to create a streaming ingestion job.
In the request payload, you set the data format settings within source.formatSettings
.
See the Job v2 API documentation for a full description of required parameters.
To ingest Protobuf data using an inline schema specification, define the following fields within formatSettings
:
format
: Set toprotobuf
.parseSchemaProvider
:type
: Set toinline-protobuf
.descriptor
: Pass a Base64-encoded string of the compiled Protobuf descriptor.
The input data in this example is defined by the following Protocol format in kttm.proto
:
syntax = "proto3";
message SomeMessage {
string timestamp = 1;
string city = 2;
string session = 3;
uint32 session_length = 4;
}
For a message schema defined in a .proto
file, you use protoc
to compile the descriptor. For example:
protoc --include_imports --descriptor_set_out=kttm.desc kttm.proto
For more information on Protobuf, see the Google documentation.
Sample request
The following example request creates an ingestion job that reads streaming Protobuf data using an inline schema:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"id": "dd1041e1-52d5-416d-9fb3-907613a0a678",
"target": {
"type": "table",
"tableName": "koalas_inline_protobuf"
},
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-11-10T21:19:51.20383234Z",
"lastUpdatedTimestamp": "2022-11-10T21:19:51.20383234Z",
"source": {
"type": "connection",
"connectionName": "confluent_inline_protobuf",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"format": "protobuf",
"parseSchemaProvider": {
"type": "inline-protobuf",
"descriptor": "CpcBCgprdHRtLnByb3RvIoABCgtTb21lTWVzc2FnZRIcCgl0aW1lc3RhbXAYASABKAlSCXRpbWVzdGFtcBISCgRjaXR5GAIgASgJUgRjaXR5EhgKB3Nlc3Npb24YAyABKAlSB3Nlc3Npb24SJQoOc2Vzc2lvbl9sZW5ndGgYBCABKA1SDXNlc3Npb25MZW5ndGhiBnByb3RvMw=="
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"readFromPoint": "latest"
}
Avro OCF
Avro Object Container File a file format that you can use in batch ingestion jobs.
Ingesting data in Avro OCF format is similar to that of Avro format.
To ingest Avro OCF data, define the following fields within formatSettings
:
format
: Set toavro_ocf
.schema
: Optionally pass the JSON object representing the Avro schema for the input data. For Avro OCF, the file itself contains the schema used to generate the file. You only needschema
if you want to override the schema in the file.
Ensure the job type is batch
and that you define the job source as uploaded
with the correct filenames in fileList
.
See the Job v2 API documentation for a full description of required parameters.
This example uses the Avro OCF file kttm-avro.ocf
.
Be sure to upload the file to the Polaris staging area before starting the ingestion job.
Sample request
The following example request creates an ingestion job that reads an Avro OCF file named kttm-avro.ocf
:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "batch",
"target": {
"type": "table",
"tableName": "koalas_avro_ocf"
},
"source": {
"type": "uploaded",
"fileList": [
"kttm-avro.ocf"
],
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_ocf"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "batch",
"target": {
"tableName": "koalas_avro_ocf",
"intervals": [],
"type": "table"
},
"source": {
"fileList": [
"kttm-avro.ocf"
],
"formatSettings": {
"binaryAsString": false,
"extractUnionsByType": false,
"schema": {},
"format": "avro_ocf"
},
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"type": "uploaded"
},
"ingestionMode": "append",
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"id": "d3f03fbd-0aeb-4a6b-b856-fd1d6cf7fca1",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-30T23:44:36.366926098Z",
"lastUpdatedTimestamp": "2023-01-30T23:44:36.366926098Z",
"startedTimestamp": null,
"completedTimestamp": null
}
Confluent Schema Registry
Before connecting to your Confluent Schema Registry, ensure that Schema Registry is enabled for your Confluent Cloud cluster, you have an API key specific to Schema Registry, and you know the endpoint for your Schema Registry server. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.
The following diagram shows an example workflow for ingesting data from a Kafka topic in Confluent Cloud that references a schema from Confluent Schema Registry. The steps in yellow signify actions that occur outside of Polaris. Arrows signify references; for example, the ingestion job references the table, Confluent Cloud connection, and Confluent Schema Registry connection.
This topic assumes you are familiar with the basics of creating tables, starting ingestion jobs, and configuring connections for streaming ingestion jobs. This section provides more information on steps 6-8 of the example workflow.
Create a connection to Confluent Schema Registry
To create a connection to Confluent Schema Registry, send a POST
request with a request payload with the following fields:
type
: Set toconfluent_schema_registry
.name
: Provide a unique name for the connection.description
: Optionally describe the connection.urls
: Provide one or more Schema Registry endpoints.
Send a POST
request to the /v2/connections
endpoint to create a connection.
See the Connections v2 API documentation for a description of required parameters.
Sample request
The following example request creates a connection named example_schema
to Confluent Schema Registry:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "confluent_schema_registry",
"name": "example_schema",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response and the details of the successful connection, for example:
{
"type": "confluent_schema_registry",
"name": "example_schema",
"submittedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"submittedOnTimestamp": "2022-12-13T22:20:54Z",
"modifiedByUser": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"modifiedOnTimestamp": "2022-12-13T22:20:54Z",
"urls": [
"https://psrc-xxxxx.us-east-1.aws.confluent.cloud"
]
}
Authenticate to the Schema Registry
Add your authentication details to the Schema Registry connection by sending a PUT
request to the /v2/connections/{name}/secrets
endpoint.
Replace {name}
in the path with the name of your connection in Polaris.
The payload for the request depends on the authentication mode configured for your Confluent Schema Registry server. Confluent Schema Registry supports multiple authentication modes, including HTTP Basic authentication or SASL authentication.
This example uses Basic authentication in which the username is the Confluent Schema Registry API key, and the password is the API secret. For information on creating API keys in Confluent Schema Registry, see Quick Start for Schema Management on Confluent Cloud.
See the Connections v2 API documentation for a description of required parameters.
Sample request
The following example adds the username and password to a Confluent Schema Registry connection using Basic authentication:
curl --location --request PUT 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/example_schema/secrets' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "basic",
"username": "XXXXXXXXXXXXXXXX",
"password": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/connections/example_schema/secrets"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "basic",
"username": "XXXXXXXXXXXXXXXX",
"password": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("PUT", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 200 OK
response.
Start ingestion job
Submit a POST
request to the /v2/jobs
endpoint to create a streaming ingestion job.
In the request payload, you set the data format settings within source.formatSettings
.
See the Job v2 API documentation for a full description of required parameters.
To ingest streaming data associated with schema from Confluent Schema Registry, define the following fields within formatSettings
:
format
: Set toavro_stream
orprotobuf
.parseSchemaProvider
:type
: Set toconnection
.connectionName
: Pass the name of the Schema Registry connection.
Sample request
The following example request creates an ingestion job that reads streaming Avro data using a schema from Confluent Schema Registry:
curl --location --request POST 'https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs' \
--user ${POLARIS_API_KEY}: \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v2/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "koalas_sr_avro"
},
"source": {
"type": "connection",
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"name": "timestamp",
"dataType": "string"
},
{
"name": "city",
"dataType": "string"
},
{
"name": "session",
"dataType": "string"
},
{
"name": "session_length",
"dataType": "float"
}
],
"formatSettings": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_schema"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the details of the ingestion job:
Click to view the response
{
"type": "streaming",
"target": {
"tableName": "koalas_sr_avro",
"intervals": [],
"type": "table"
},
"source": {
"connectionName": "confluent_sr_avro",
"inputSchema": [
{
"dataType": "string",
"name": "timestamp"
},
{
"dataType": "string",
"name": "city"
},
{
"dataType": "string",
"name": "session"
},
{
"dataType": "float",
"name": "session_length"
}
],
"formatSettings": {
"parseSchemaProvider": {
"connectionName": "example_schema",
"type": "connection"
},
"format": "avro_stream"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "session",
"expression": "\"session\""
},
{
"columnName": "max_session_length",
"expression": "MAX(\"session_length\")"
},
{
"columnName": "__count",
"expression": "COUNT(*)"
}
],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"id": "b5b913c0-5608-48f2-9f2d-4fcc10e73d40",
"desiredExecutionStatus": "running",
"createdBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"lastModifiedBy": {
"username": "api-key-pok_vipgj...bjjvyo",
"userId": "a52cacf6-3ddc-48e5-8675-2f6ec5e0e917"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2023-01-26T19:59:37.781867043Z",
"lastUpdatedTimestamp": "2023-01-26T19:59:37.781867043Z",
"startedTimestamp": null,
"completedTimestamp": null
}