›Developer guide

Get started

  • Introduction to Imply Polaris
  • Quickstart
  • Navigate the console
  • Key concepts

Data

  • Overview
  • Introduction to tables
  • Create a schema
  • Batch ingestion
  • Data partitioning
  • Introduction to rollup
  • Replace data
  • Supported data formats

Analytics

  • Overview
  • Manage data cubes
  • Visualize data
  • Dimensions
  • Measures
  • Dashboards
  • Create a dashboard
  • Visualizations reference
  • Query data

Monitoring

  • Overview

Management

  • Overview

Billing

  • Overview
  • Polaris plans
  • Estimate project costs
  • Manage billing and payments

Usage

  • Overview

Security

  • Overview
  • Add users to an organization
  • User roles reference
  • Manage user groups
  • Enable SSO
  • SSO settings reference

Developer guide

  • Overview
  • Authenticate API requests
  • Create a table
  • Get table ID
  • Define a schema
  • Upload files
  • Ingest batch data
  • Push event data
  • Aiven HTTP Connector for Kafka
  • Query data
  • Link to BI tools
  • Connect over JDBC

API reference

  • Overview
  • Reference index
  • Events API
  • Files API
  • Ingestion Jobs API
  • Ingestion Templates API
  • Performance API
  • Query API
  • Tables API
  • Common object definitions

    • Table
    • TableRequest
    • RollupSchema
    • IngestionJobSpec
    • CsvFormatSettings
    • JsonFormatSettings
    • TimestampMapping

Product info

  • Release notes
  • Known limitations

Push event data by API

You can use the Imply Polaris Events API to push data from an application source. This topic covers the basic patterns for you to create a connector to send data from your application to a Polaris table.

Push stream ingestion for Polaris loads your data from a source data stream into a destination table in Polaris. Push stream ingestion is available by default when you create a table.

This topic walks you through how to send events to Polaris.

If you want to skip the details, check out the example.

Prerequisites

This topic assumes you have the following:

  • Events that you can push to Polaris. You can stream newline-delimited JSON into Polaris. See Supported data formats for more information.
  • A destination table with a defined schema. You can create a schema for an existing table using the UI or API.
  • The table ID of your destination table. To determine your table ID, see Identify a destination table.
  • An OAuth token with the ManageStreams role. In the examples below, the token value is stored in the variable named IMPLY_TOKEN. See Authenticate API requests to obtain an access token and assign service account roles. Visit User roles reference for more information on roles and their permissions.

Send events to Polaris

To load events, send them to the endpoint for your table using the Events API. If the table schema does not match the schema of the streaming record, Polaris ignores the extra columns not in the table schema.

To send an event to Polaris, issue a POST request to the Events API and pass the event data in the payload. You can send a single event or batch multiple events into a single POST request.

Each Polaris organization is limited to 83,334 calls per minute. We recommend that you include up to 60 events in a batch per request. This effectively limits your rate to approximately 5 million events per minute.

Event payload requirements

The following requirements apply to incoming events:

  • Events must contain a timestamp column named __time with event times stored in ISO format.
  • The event timestamp must be within 7 days of ingestion time. Polaris rejects events with timestamps older than 7 days.
  • A single payload request must not exceed 1 MB in size.
  • The maximum throughput for incoming events is 100 MBps.

As a good practice for high throughput, direct your client to batch records together into a single payload before sending them into Polaris. We recommend sending events when any of the following criteria is met:

  • When 60 events are collected into a single payload.
  • When the batch size reaches 1 MB.
  • When the batching window, or the time to gather records into a single payload, reaches 500 ms.

Sample request

The following example shows how to send event data containing two records to a Polaris table:

cURL
Python
curl --location --request POST 'https://api.imply.io/v1/events/f972c0f5-e5f2-41b5-a5b3-9b64cf365cae' \
--header "Authorization: Bearer $IMPLY_TOKEN" \
--header 'Content-Type: application/json' \
--data-raw '{"__time":"2022-04-16T00:46:58.771Z","channel":"#en.wikipedia","user":"GELongstreet","added":36,"deleted":0}
{"__time":"2022-04-17T04:07:28.781Z","channel":"#de.wikipedia","user":"Kolega2357","added":13,"deleted":16}'

import requests
import json

url = "https://api.imply.io/v1/events/f972c0f5-e5f2-41b5-a5b3-9b64cf365cae"

payload = "{\"__time\":\"2022-04-16T00:46:58.771Z\",\"channel\":\"#en.wikipedia\",\"user\":\"GELongstreet\",\"added\":36,\"deleted\":0}\n{\"__time\":\"2022-04-17T04:07:28.781Z\",\"channel\":\"#de.wikipedia\",\"user\":\"Kolega2357\",\"added\":13,\"deleted\":16}"
headers = {
'Authorization': 'Bearer {token}'.format(token=IMPLY_TOKEN),
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Sample response

The following example shows a successful response for sending event data to a Polaris table:

200 OK

View rate limits

The Events API returns HTTP headers describing the rate limits and number of available requests remaining for your Polaris organization. The following headers are returned:

  • RateLimit-Limit: Maximum number of requests per minute.
  • RateLimit-Remaining: Number of requests remaining in the current minute.
  • RateLimit-Reset: Number of seconds until the request limit resets.
  • X-RateLimit-Limit-Minute: Maximum number of requests per minute. Same as RateLimit-Limit.
  • X-RateLimit-Remaining-Minute: Number of requests remaining in the current minute. Same as RateLimit-Remaining.

Monitor streaming

Navigate to Monitoring > Streaming to view dashboards that monitor the overall health of your event stream including:

  • Ingest latency
  • The number of Events processed
  • Rejections because Events arrived too late
  • Unparseable events.

To view specific errors related to event ingestion, navigate to table details. Select the View details option from the ... (Actions) menu.

Example

The following example shows how to poll data from Coinbase and push it to Polaris using the Events API.

import datetime
import os
import time

import requests

# Replace name with your organization
ORG_NAME = ""

# Supply the client ID and client secret in the following string variables.
# Do not supply OAuth credentials in production scripts and
# do not check them into version control systems.
# See https://docs.imply.io/polaris/oauth/ for more information.
CLIENT_ID = ""
CLIENT_SECRET = ""

# Supply the table ID in the following string variable
TABLE_ID = ""

# Store endpoints for Polaris OAuth API and Events API
TOKEN_ENDPOINT = "https://id.imply.io/auth/realms/{org_name}/protocol/openid-connect/token".format(org_name=ORG_NAME)
EVENTS_ENDPOINT = "https://api.imply.io/v1/events/{table_id}".format(table_id=TABLE_ID)

access_token = None


def update_token():
    global access_token

    params = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "client_credentials",
    }

    response = requests.post(TOKEN_ENDPOINT, data=params)
    response.raise_for_status()

    access_token = response.json()['access_token']


def make_post(url, json):
    def do_post():
        headers = {
            "Authorization": "Bearer {token}".format(token=access_token),
            "Content-Type": "application/json"
        }
        return requests.post(url, headers=headers, data=json)

    response = do_post()

    # If the token expired, refresh it and try the request again
    while response.status_code == 401:
        print("Refreshing token")
        update_token()
        response = do_post()

    # Raise an exception on response errors at this point
    response.raise_for_status()

    return response


def push_data(data):
    response = make_post(EVENTS_ENDPOINT, data)
    print(response.status_code, response.reason)
    print(response.text)
    print("----")


while True:
    # Get the price of bitcoin and push it
    btc_resp = requests.get("https://api.coinbase.com/v2/prices/spot?currency=USD")
    btc_resp.raise_for_status()

    push_data({
        "__time": datetime.datetime.utcnow().isoformat(),
        "price": btc_resp.json()["data"]["amount"],
    })
    time.sleep(1)

Known limitations

We're regularly updating Polaris to add features and fixes. If you run into an issue, check the Known limitations.

Learn more

See the following topics for more information:

  • Events API for reference on push stream ingestion.
  • Tables API for reference on creating and managing tables.
  • Ingest batch data by API for ingesting batch data into a table.
← Ingest batch dataAiven HTTP Connector for Kafka →
  • Prerequisites
  • Send events to Polaris
    • Event payload requirements
    • Sample request
    • Sample response
  • View rate limits
  • Monitor streaming
  • Example
  • Known limitations
  • Learn more
Key links
Try ImplyApache Druid siteImply GitHub
Get help
Stack OverflowSupportContact us
Learn more
BlogApache Druid docs
Copyright © 2022 Imply Data, Inc