Skip to main content

Push event data by API

You can use the Imply Polaris API to push data from an application source into Polaris over HTTP(S). This topic covers the basic patterns for you to create a connector to send data from your application to a Polaris table.

Push stream ingestion for Polaris loads your data from a source data stream into a destination table in Polaris. Push stream ingestion involves the following steps:

  1. Create a connection for push streaming. Use either the UI or API to create a push_streaming connection. You can use the same connection in multiple ingestion jobs to stream the same data into multiple tables. A table may have only one associated streaming job.
  2. Start a job to ingest from the connection. In the ingestion job request, you define the schema of your source data and map or transform the source input fields to the columns in a Polaris table.
  3. Send event data to Polaris. The endpoint where you push data uses the connection name in the endpoint—for example, /v1/projects/PROJECT_ID/events/CONNECTION_NAME.

The following diagram shows an overview of push streaming in Polaris. From left to right, events are sent to the push streaming connection named demo_connection. Polaris interprets and maps the data through ingestion jobs to load into tables. The diagram illustrates a single push streaming connection used for ingesting data into two Polaris tables.

Push streaming diagram

This topic walks you through how to send events to Polaris. If you want to skip the details, check out the example.

Prerequisites

This topic assumes you have the following:

  • An API key with the following permissions:

    • ManageConnections to create and edit connections
    • ManageTables to create and modify tables
    • ManageIngestionJobs to create ingestion jobs pertaining to connections

    In the examples below, the key value is stored in the variable named POLARIS_API_KEY. See API key authentication to obtain an API key and assign permissions. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

  • Events that you can push to Polaris.

    • See Supported source data formats for details on data formats.
    • See Streaming use cases for requirements for events.
      • The event timestamp must be within 30 days of ingestion time. If you need to ingest events older than 30 days, configure lateMessageRejectionPeriod accordingly in your streaming ingestion job.
      • A single payload request must not exceed 1 MB in size.
    • As a good practice for high throughput, direct your client to batch records together into a single payload before sending them into Polaris. You can send events when any of the following criteria are met:
      • When the batch size reaches 1 MB.
      • When the batching window, or the time to gather records into a single payload, reaches 500 ms.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Create push streaming connection

Create a connection to collect the events sent to Polaris. For push streaming, the connection acts as a bridge between sending events and ingesting the event data.

To create a connection, issue a POST request to the Connections API, and pass the connection specification as a payload to the request. The connection specification is a JSON object containing the name and type of connection. The name of the connection is used in requests to push data to Polaris using the Events API. Assign the connection type as push_streaming. For example:

{
"type": "push_streaming",
"name": "demo_connection"
}

Sample request

The following example shows how to create a connection for push streaming ingestion:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/connections" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "push_streaming",
"name": "demo_connection"
}'

Sample response

The following example shows a response for successfully creating a connection:

{
"type": "push_streaming",
"name": "demo_connection",
"endpointUrl": "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/events/demo_connection"
}

Start a job to ingest data

Create a job to ingest the data that will be sent to the Events API. You do not need an ingestion job to send events, and you can push data to the connection endpoint before starting the job to auto-generate the input schema.

If you push events before the ingestion job is created, Polaris only ingests them if the offset is set to earliest in readFromPoint. Otherwise, Polaris ingests data sent after you create the ingestion job. Regardless of the offset, the event timestamp must meet the requirements described in Streaming use cases.

You only need to create the ingestion job once. After initiating ingestion, you can repeatedly send events to Polaris, which automatically ingests the data into the specified table.

To create a streaming job, send a POST request to the Jobs v1 API with the job specification in the request payload. In the ingestion job specification, you define the input fields of your source data and map them to Polaris columns. The job specification contains the following fields:

  • type: Set to streaming for push ingestion.

  • target: An object with fields type and tableName. Set the target type to table and the table name to the name of the destination table. For example:

    "target": {
    "type": "table",
    "tableName": "demo_table"
    }
  • source: An object that references the push streaming connection. You must create a connection before you create a job to reference it. Set the source type to connection, and assign the connection name to connectionName. List the names and data types of the input fields in inputSchema. Set the incoming data format in formatSettings. See Supported source data formats for details on data formats.

    The following example shows a source object:

    "source": {
    "type": "connection",
    "connectionName": "demo_connection",
    "inputSchema": [
    {
    "name": "event_time",
    "dataType": "string"
    },
    {
    "name": "user",
    "dataType": "string"
    },
    {
    "name": "added",
    "dataType": "float"
    }
    ],
    "formatSettings": {
    "format": "nd-json"
    }
    }
  • mappings: An array that describes how to map the input data to the table schema. Map the timestamp field from the event request body to the __time column name. Enclose each input field within an expression in quotation marks. See Map and transform data with input expressions for details and usage notes.

    In this example, we assume the destination table schema matches the schema of the incoming data.

    "mappings": [
    {
    "columnName": "__time",
    "expression": "TIME_PARSE(\"event_time\")"
    },
    {
    "columnName": "user",
    "expression": "\"user\""
    },
    {
    "columnName": "added",
    "expression": "\"added\""
    }
    ]
  • readFromPoint: A field to describe the offset where Polaris should start ingesting data from the stream. Set to earliest to read from the earliest available point in the stream. Use this if there is already data in the stream you wish you ingest. Set to latest to read data from the latest point available in the stream. In this case, Polaris only ingests data published to the stream after the job is created.

The following example shows a complete specification for a push streaming ingestion job:

{
"type": "streaming",
"target": {
"type": "table",
"tableName": "demo_table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "demo_connection",
"inputSchema": [
{
"name": "event_time",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "added",
"dataType": "float"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"event_time\")"
},
{
"columnName": "user",
"expression": "\"user\""
},
{
"columnName": "added",
"expression": "\"added\""
}
]
}

Sample request

The following example shows how to launch an ingestion job referencing the push streaming connection:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "demo_table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "demo_connection",
"inputSchema": [
{
"name": "event_time",
"dataType": "string"
},
{
"name": "user",
"dataType": "string"
},
{
"name": "added",
"dataType": "float"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"event_time\")"
},
{
"columnName": "user",
"expression": "\"user\""
},
{
"columnName": "added",
"expression": "\"added\""
}
]
}'

Sample response

The following example shows a response for successfully starting an ingestion job:

{
"type": "streaming",
"id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"target": {
"type": "table",
"tableName": "demo_table"
},
"createTableIfNotExists": true,
"desiredExecutionStatus": "running",
"createdBy": {
"username": "service-account-docs-demo",
"userId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
},
"lastModifiedBy": {
"username": "service-account-docs-demo",
"userId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
},
"executionStatus": "pending",
"health": {
"status": "ok"
},
"createdTimestamp": "2022-08-01T21:19:48.783889685Z",
"lastUpdatedTimestamp": "2022-08-01T21:19:48.783889685Z",
"source": {
"type": "connection",
"connectionName": "demo_connection",
"inputSchema": [
{
"dataType": "string",
"name": "event_time"
},
{
"dataType": "string",
"name": "user"
},
{
"dataType": "float",
"name": "added"
}
],
"formatSettings": {
"format": "nd-json"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "TIME_PARSE(\"event_time\")"
},
{
"columnName": "user",
"expression": "\"user\""
},
{
"columnName": "added",
"expression": "\"added\""
}
]
}

Send events to Polaris

To load events, send them to the endpoint for your connection using the Events API. If the table schema does not match the schema of the streaming record, Polaris ignores the extra columns not in the table schema.

To send an event to Polaris, issue a POST request to the Events API and pass the event data in the payload. Specify the push streaming connection name in the path parameter.

For example, with a connection named demo_connection, push events to /v1/projects/PROJECT_ID/events/demo_connection.

info

For legacy tables created using the Tables v1 API, the connection name is named after the table ID.

You can send a single event or batch multiple events into a single POST request. Ensure each event is delimited by a newline character.

Sample request

The following example shows how to send event data containing two records to a Polaris table. The field names and data types of the event data match the input schema provided in the job specification. For example, the event time is provided as a string in the event_time field. Update the time values in the example to within the past 30 days.

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/events/demo_connection" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{"event_time":"2022-04-16T00:46:58.771Z","user":"GELongstreet","added":36}
{"event_time":"2022-04-17T04:07:28.781Z","user":"Kolega2357","added":13}'

You can also send gzipped data with the HTTP header Content-Encoding: gzip. For example, consider a file named data.json.gz that contains the following:

{"event_time":"2022-10-16T00:46:58.771Z","user":"GELongstreet","added":36}
{"event_time":"2022-10-17T04:07:28.781Z","user":"Kolega2357","added":13}

Send the request as follows:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/events/demo_connection" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--header "Content-Encoding: gzip" \
--data-binary @data.json.gz

Sample response

The following example shows a successful response for sending event data to a Polaris table:

200 OK

View rate limits

The Events API returns HTTP headers describing the rate limits and number of available requests remaining for your Polaris organization. The following headers are returned:

  • RateLimit-Limit: Maximum number of requests per minute.
  • RateLimit-Remaining: Number of requests remaining in the current minute.
  • RateLimit-Reset: Number of seconds until the request limit resets.
  • X-RateLimit-Limit-Minute: Maximum number of requests per minute. Same as RateLimit-Limit.
  • X-RateLimit-Remaining-Minute: Number of requests remaining in the current minute. Same as RateLimit-Remaining.

Monitor streaming

For details on monitoring your streaming ingestion job, see Monitor streaming jobs.

Example

The following example shows how to poll data from Coinbase and push it to Polaris using the Events API.

Before running this example, review the Prerequisites, and ensure you have the following:

  • An API key with the correct permissions
  • A table with the following schema:
    • __time, the primary timestamp column
    • price, a float dimension
  • A push streaming connection, whose name you supply in CONN_NAME
  • A streaming ingestion job that does the following:
    • Specifies the ingestion source as the push streaming connection
    • Specifies the ingestion target as the table
    • Lists the incoming format data as nd-json
    • Maps event_time and price in the input to __time and price in the table, respectively
import datetime
import json
import os
import requests
import time


# Read API key from environment variable.
# API keys do not expire.
# Do not supply an API key in production scripts, and
# do not check them into version control systems.
# For more information, see https://docs.imply.io/polaris/api-keys
API_KEY = os.getenv("POLARIS_API_KEY")

# Replace the project-scoped URL path parameters with your information.
# For the description of path parameters, refer to https://docs.imply.io/api/polaris/api-reference.
ORGANIZATION_NAME = "example"
REGION = "us-east-1"
CLOUD_PROVIDER = "aws"
PROJECT_ID = "12375ffx-f7x4-4f0x-a1a6-3b3424987ee0"


# Supply the push streaming connection name in the following string variable
CONN_NAME = "coinbase"

# Polaris Events API
EVENTS_ENDPOINT = f"https://{ORGANIZATION_NAME}.{REGION}.{CLOUD_PROVIDER}.api.imply.io/v1/projects/{PROJECT_ID}/events/{CONN_NAME}"

# Called by push_data to send an event to the Polaris connection
def make_post(url, json):
def do_post():
headers = {
"Authorization': f'Basic {API_KEY}:",
"Content-Type": "application/json"
}
return requests.post(url, headers=headers, data=json)

response = do_post()

# Raise an exception on response errors at this point
response.raise_for_status()

return response

# Send a POST request and print HTTP response
def push_data(data):
response = make_post(EVENTS_ENDPOINT, data)
print(response.status_code, response.reason)
print(response.text)
print("----")


while True:

# Get the price of bitcoin and push it
btc_resp = requests.get("https://api.coinbase.com/v2/prices/spot?currency=USD")
btc_resp.raise_for_status()

# Try the push. If you get a 401 error, verify that your API key has the right permissions.
push_data(json.dumps({
"event_time": datetime.datetime.utcnow().isoformat(),
"price": float(btc_resp.json()["data"]["amount"]),
}))
time.sleep(1)

Known limitations

We're regularly updating Polaris to add features and fixes. If you run into an issue, check the Known limitations.

Learn more

See the following topics for more information: