Push event data by API
You can use the Imply Polaris Events API to push data from an application source. This topic covers the basic patterns for you to create a connector to send data from your application to a Polaris table.
Push stream ingestion for Polaris loads your data from a source data stream into a destination table in Polaris. Push stream ingestion is available by default when you create a table.
This topic walks you through how to send events to Polaris.
If you want to skip the details, check out the example.
Prerequisites
This topic assumes you have the following:
- Events that you can push to Polaris. You can stream newline-delimited JSON into Polaris. See Supported data formats for more information.
- A destination table with a defined schema. You can create a schema for an existing table using the UI or API.
- The table ID of your destination table. To determine your table ID, see Identify a destination table.
- An OAuth token with the
ManageStreams
role. In the examples below, the token value is stored in the variable namedIMPLY_TOKEN
. See Authenticate API requests to obtain an access token and assign service account roles. Visit User roles reference for more information on roles and their permissions.
Send events to Polaris
To load events, send them to the endpoint for your table using the Events API. If the table schema does not match the schema of the streaming record, Polaris ignores the extra columns not in the table schema.
To send an event to Polaris, issue a POST
request to the Events API and pass the event data in the payload. You can send a single event or batch multiple events into a single POST
request.
Each Polaris organization is limited to 83,334 calls per minute. We recommend that you include up to 60 events in a batch per request. This effectively limits your rate to approximately 5 million events per minute.
Event payload requirements
The following requirements apply to incoming events:
- Events must contain a timestamp column named
__time
with event times stored in ISO format. - The event timestamp must be within 7 days of ingestion time. Polaris rejects events with timestamps older than 7 days.
- A single payload request must not exceed 1 MB in size.
- The maximum throughput for incoming events is 100 MBps.
As a good practice for high throughput, direct your client to batch records together into a single payload before sending them into Polaris. We recommend sending events when any of the following criteria is met:
- When 60 events are collected into a single payload.
- When the batch size reaches 1 MB.
- When the batching window, or the time to gather records into a single payload, reaches 500 ms.
Sample request
The following example shows how to send event data containing two records to a Polaris table:
curl --location --request POST 'https://api.imply.io/v1/events/f972c0f5-e5f2-41b5-a5b3-9b64cf365cae' \
--header "Authorization: Bearer $IMPLY_TOKEN" \
--header 'Content-Type: application/json' \
--data-raw '{"__time":"2022-04-16T00:46:58.771Z","channel":"#en.wikipedia","user":"GELongstreet","added":36,"deleted":0}
{"__time":"2022-04-17T04:07:28.781Z","channel":"#de.wikipedia","user":"Kolega2357","added":13,"deleted":16}'
import requests
import json
url = "https://api.imply.io/v1/events/f972c0f5-e5f2-41b5-a5b3-9b64cf365cae"
payload = "{\"__time\":\"2022-04-16T00:46:58.771Z\",\"channel\":\"#en.wikipedia\",\"user\":\"GELongstreet\",\"added\":36,\"deleted\":0}\n{\"__time\":\"2022-04-17T04:07:28.781Z\",\"channel\":\"#de.wikipedia\",\"user\":\"Kolega2357\",\"added\":13,\"deleted\":16}"
headers = {
'Authorization': 'Bearer {token}'.format(token=IMPLY_TOKEN),
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
The following example shows a successful response for sending event data to a Polaris table:
200 OK
View rate limits
The Events API returns HTTP headers describing the rate limits and number of available requests remaining for your Polaris organization. The following headers are returned:
- RateLimit-Limit: Maximum number of requests per minute.
- RateLimit-Remaining: Number of requests remaining in the current minute.
- RateLimit-Reset: Number of seconds until the request limit resets.
- X-RateLimit-Limit-Minute: Maximum number of requests per minute. Same as
RateLimit-Limit
. - X-RateLimit-Remaining-Minute: Number of requests remaining in the current minute. Same as
RateLimit-Remaining
.
Monitor streaming
Navigate to Monitoring > Streaming to view dashboards that monitor the overall health of your event stream including:
- Ingest latency
- The number of Events processed
- Rejections because Events arrived too late
- Unparseable events.
To view specific errors related to event ingestion, navigate to table details. Select the View details option from the ... (Actions) menu.
Example
The following example shows how to poll data from Coinbase and push it to Polaris using the Events API.
import datetime
import os
import time
import requests
# Replace name with your organization
ORG_NAME = ""
# Supply the client ID and client secret in the following string variables.
# Do not supply OAuth credentials in production scripts and
# do not check them into version control systems.
# See https://docs.imply.io/polaris/oauth/ for more information.
CLIENT_ID = ""
CLIENT_SECRET = ""
# Supply the table ID in the following string variable
TABLE_ID = ""
# Store endpoints for Polaris OAuth API and Events API
TOKEN_ENDPOINT = "https://id.imply.io/auth/realms/{org_name}/protocol/openid-connect/token".format(org_name=ORG_NAME)
EVENTS_ENDPOINT = "https://api.imply.io/v1/events/{table_id}".format(table_id=TABLE_ID)
access_token = None
def update_token():
global access_token
params = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "client_credentials",
}
response = requests.post(TOKEN_ENDPOINT, data=params)
response.raise_for_status()
access_token = response.json()['access_token']
def make_post(url, json):
def do_post():
headers = {
"Authorization": "Bearer {token}".format(token=access_token),
"Content-Type": "application/json"
}
return requests.post(url, headers=headers, data=json)
response = do_post()
# If the token expired, refresh it and try the request again
while response.status_code == 401:
print("Refreshing token")
update_token()
response = do_post()
# Raise an exception on response errors at this point
response.raise_for_status()
return response
def push_data(data):
response = make_post(EVENTS_ENDPOINT, data)
print(response.status_code, response.reason)
print(response.text)
print("----")
while True:
# Get the price of bitcoin and push it
btc_resp = requests.get("https://api.coinbase.com/v2/prices/spot?currency=USD")
btc_resp.raise_for_status()
push_data({
"__time": datetime.datetime.utcnow().isoformat(),
"price": btc_resp.json()["data"]["amount"],
})
time.sleep(1)
Known limitations
We're regularly updating Polaris to add features and fixes. If you run into an issue, check the Known limitations.
Learn more
See the following topics for more information:
- Events API for reference on push stream ingestion.
- Tables API for reference on creating and managing tables.
- Ingest batch data by API for ingesting batch data into a table.