PubSub 101 - Querying Topics with SQL

PubSub 101 - Querying Topics with SQL

Patrick Deziel | Friday, Dec 8, 2023 |  Ensign enSQLPubSub 101

Real-time data streams are powerful, but sometimes you need to convert the stream into a batch or replay parts of the stream. Here’s how to write customized SQL to more deeply understand and interact with your historic data.

Heads Up: This module assumes you’ve already created a topic and published some data to it.

If you haven’t already, take a look at the previous modules to make sure you have a topic and some data to work with! Using the previous modules, you’ll create an Ensign project, get familiar with the PyEnsign SDK and create your first topic, and finally build a publisher and run a subscriber to make sure you’ve got some data in your topic ready to go for this module.

Event Persistence

You already know that Ensign can stream events for you through the use of publishers and subscribers. Pure stateless processing is sometimes all you need. However, in many cases you need to see what was published in the past, in order to recover from unexpected failure, perform historical or trend analytics, or debug asynchronous workflows.

By default Ensign persists all of the events that have been published to all of your topics. This makes it possible to both retrieve any previous event or replay a stream of events from any point in time (That means you’re a time traveler! Great Scott!).

enSQL

Ensign implements a specific query language with the same syntax as classical SQL except that it lets you query Ensign topics over specific windows of time. We call it enSQL. Similar to how you can query over tables in relational databases, with enSQL you can query over topics.

Here’s a visual representation of the basic syntax:

“enSQL Syntax”

If you want, check out the full documentation about enSQL syntax and query operators.

Your First Query

Let’s return to the flight tracker project we’ve been working on in the last few modules and see if we can run some queries. Consider the query below.

SELECT * FROM flight-positions

You can interpret the query as “select all the events from the flight-positions topic”. Try running it from the topic dashboard for the flight-positions topic, which you can get to from the project dashboard. You should see the query results below.

“Simple Query”

Note that the events are returned in the order that they were published, starting from the very first event in the topic to the most recent event. This turns out to be a useful property because it allows you to “replay” the event stream.

Of course, you can also run queries directly from Python SDK. With the pyensign client, the query() function returns a cursor to fetch events with fetchone(), fetchmany(), and fetchall(), similar to how you would process query results with a database adapter like psycopg2.

Retrieve the First Event from Your Topic

ensign = Ensign()
cursor = await ensign.query("SELECT * FROM flight-positions")
print(await cursor.fetchone())
Event:
	id: 0661t13avm0gxesn
	data: b'{"icao24": "ac2f83", "callsign": "", "origin_country": "United States", "time_position": 170129382...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57

Pro Tip: If you are running into async errors with the code, you can run it in a Python notebook to avoid writing the asyncio boilerplate

Retrieve the First n Events from Your Topic

cursor = await ensign.query("SELECT * FROM flight-positions")
events = await cursor.fetchmany(3)
for event in events:
    print(event)
Event:
	id: 0661t13avm0gxesn
	data: b'{"icao24": "ac2f83", "callsign": "", "origin_country": "United States", "time_position": 170129382...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57
Event:
	id: 0661t13awg0gxesp
	data: b'{"icao24": "c02e9b", "callsign": "JZA635  ", "origin_country": "Canada", "time_position": 17012938...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57
Event:
	id: 0661t13aww0gxesq
	data: b'{"icao24": "ad3add", "callsign": "N9511D  ", "origin_country": "United States", "time_position": 1...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57

Retrieve All Events from Your Topic

You can also iterate over the cursor itself to asynchronously retrieve the stream of events, which is like subscribing at a previous point in time.

cursor = await ensign.query("SELECT * FROM flight-positions")
async for event in cursor:
    print(event)
Event:
	id: 0661t13avm0gxesn
	data: b'{"icao24": "ac2f83", "callsign": "", "origin_country": "United States", "time_position": 170129382...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57
Event:
	id: 0661t13awg0gxesp
	data: b'{"icao24": "c02e9b", "callsign": "JZA635  ", "origin_country": "Canada", "time_position": 17012938...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57
Event:
	id: 0661t13aww0gxesq
	data: b'{"icao24": "ad3add", "callsign": "N9511D  ", "origin_country": "United States", "time_position": 1...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57
Event:
	id: 0661t13ax40gxesr
	data: b'{"icao24": "aa9dc1", "callsign": "XOJ783  ", "origin_country": "United States", "time_position": 1...
	mimetype: application/json
...
	mimetype: application/json
	schema: FlightVector v0.1.0
	state: EventState.INITIALIZED
	created: 2023-11-29 15:37:57

Customizing Queries

There are many ways to customize queries. Here are some of the basics, but you can also check out the documentation for more details and the latest features.

Filtering

Every event has a schema type and version, which is visible on the event itself or on the topic dashboard. You can filter on schema types and versions, e.g. to select directly from the FlightVector event type.

SELECT * FROM flight-positions.FlightUpdate

Windowing

If the first event was in error and you don’t want to process it, you can use the OFFSET keyword.

SELECT * FROM flight-positions OFFSET 1

Or if you want to specify a window of events, e.g. events 1 to 3, skipping event 0.

SELECT * FROM flight-positions OFFSET 1 LIMIT 3

DataFrames

PyEnsign DataFrames are an additional way to serialize results from an enSQL query. They are particularly useful for things like batchwise data analytics or training traditional machine learning models. To use them, you need to install the [ml] extension.

$ pip install pyensign[ml]

Then simply create the DataFrame from the query cursor.

from pyensign.ml.dataframe import DataFrame
cursor = await ensign.query("SELECT * FROM flight-positions")
df = await DataFrame().from_events(cursor)
df.head()

“DataFrame”

By default, the columns are the keys in the data and the rows are the events, ordered by ID.

PyEnsign DataFrames have the same capabilities as pandas DataFrames, so you can apply transformations to the columns, merge them, sample them, etc.

df['origin_country'].value_counts()

origin_country
United States           75
Austria                  4
Canada                   4
Ireland                  4
United Kingdom           4
Portugal                 2
Germany                  2
Malta                    2
Iraq                     1
Spain                    1
China                    1
Poland                   1
Hungary                  1
United Arab Emirates     1
Turkey                   1
Ethiopia                 1
Mexico                   1
Republic of Korea        1
Qatar                    1
Egypt                    1
Name: count, dtype: int64

Back to the Future

⭐🎉⭐ Congrats on completing PubSub101! ⭐🎉⭐

Now that you’ve created a project with pub/sub dataflows, there’s nothing stopping you from creating real-time applications. We can’t wait to see what you do with Ensign! Don’t hesitate to reach out to support@rotational.io if you have further questions.

Photo by Pangstu

About This Post

In this module you will query your Ensign topics using SQL-like syntax to get historic events. Great Scott, it's time travel!

Written by:

Share this post:

Recommended  Rotations

butterfly
View all

PubSub 101 - Creating a Subscriber

With the rise of LLMs and Retrieval Augmented Generation (RAG), online models have never been more relevant. Here’s how to write some Python code to interpret real-time data and build your own adaptive online model.

Dec 5, 2023

PubSub 101 - Creating a Publisher

One of the first steps in every data science or machine learning project is data ingestion. In this module, you will use Python to create a publisher to ingest real-time data into Ensign.

Nov 28, 2023

PubSub 101 - Creating Data Flows With Topics

If you struggle to build analytics and models that keep up with changing data, it might be because you haven’t yet learned to think about data in terms of topics. In this module, learn how by solving a real-world, real-time problem!

Nov 22, 2023
Enter Your Email To Subscribe