Skip to main content

Multiple topics per connection

Connections in Imply Polaris describe the source of data from which to ingest into tables. When the source of your data comes from an Apache Kafka streaming platform, including managed Kafka services such as Confluent Cloud, you can create a connection associated with multiple Kafka topics. You cannot use multi-topic connections with Amazon Kinesis. An ingestion job from a multi-topic Kafka connection ingests data from all topics within the scope of the connection.

This topic describes multi-topic connections in Polaris. Before continuing, you should be familiar with the concepts of creating connections and ingestion jobs.

Prerequisites

The following details must be identical for all topics associated with a multi-topic connection:

Create a multi-topic connection

To create a multi-topic connection, check the box for Use regex for topic name underneath Topic name.

In Topic name, specify a regular expression pattern to identify topic names. For example, topic[0-9]* matches topic1 and topic23 but not topicAB.

Create multi-topic connection

When you test the connection, Polaris seeks to the beginning and end offsets from all the partitions for all applicable topics. If the test connection fails, verify the bootstrap servers and authentication credentials. A test connection may also fail if any of the topics contain an invalid partition.

Ingest from a multi-topic connection

This example describes how Polaris treats multi-topic connections during ingestion. To follow along, create three topics and push data to each topic, following the given structure and examples:

  • Events in topic1 have the properties event_time, sessionId, and browseTime

    {"event_time": "2023-10-16T00:00:00.000Z", "sessionId": "S834761", "browseTime": 609.001}
  • Events in topic2 have the properties event_time, browseTime, discount, and purchaseTotal

    {"event_time": "2023-10-17T00:00:00.000Z", "browseTime": "3045.987", "discount": 48.42, "purchaseTotal": 2054.13}
  • Events in topic3 have the properties event_time, discount, and purchaseTotal

    {"event_time": "2023-10-18T00:00:00.000Z", "discount": 4.59, "purchaseTotal": 688.70}
  1. Create a multi-topic connection. For the topic name, enter topic[123].

  2. Start a new ingestion job using the multi-topic connection as the source. In the Parse data step, select the toggle to Parse Kafka metadata. Polaris then includes a column called kafka.topic that lists the name of the topic that produced the row of data.

    Parse from multi-topic connection

  3. In the Map source to table step, apply the following mappings for the given table columns:

    • __time mapped to TIME_PARSE("event_time")
    • browseTime mapped to "browseTime"
    • kafka.topic mapped to "kafka.topic"
    • sessionId mapped to "sessionId"
    • valueTotal mapped to "discount" + "purchaseTotal"

    You can remove extra columns from the Kafka metadata or that are already used in another mapping.

    Map from multi-topic connection

  4. Click Start ingestion to launch the ingestion job.

  5. On the table view, select Query > SQL console to query the data in this example.

    SELECT * FROM "Triple topic data"

    Notice the following details:

    • When you select a multi-topic connection as the source for an ingestion job, Polaris applies input expressions for all columns in all topics. Notice that the expression for valueTotal is applied to rows that come from both topic2 and topic3.

    • If a column exists in one topic but not the others, Polaris fills in null values from the topics that do not have the column. Rows produced by topic1 have a null value for valueTotal, and the rows produced topic3 have null values for browseTime.

    • If the data types of the input data do not match between different topics, Polaris assigns the widest data type that encompasses the data. Notice that the browseTime column contains the string data type since topic1 sent this data as a float, and topic2 sent the data as a string.

    Query from multi-topic connection