Continuous Views

Tantor PipelineDB’s fundamental abstraction is called a continuous view. A continuous view is much like a regular view, except that it selects from a combination of streams and tables as its inputs and is incrementally updated in realtime as new data is written to those inputs.

As soon as a stream row has been read by the continuous views that must read it, it is discarded. Raw, granular data is not stored anywhere. The only data that is persisted for a continuous view is whatever is returned by running a SELECT * FROM that_view. Thus you can think of a continuous view as a very high-throughput, realtime materialized view.

Creating Continuous Views

Continuous views are defined as PostgreSQL views with the action=materialize parameter. Here’s the syntax for creating a continuous view:

CREATE VIEW name [WITH (action=materialize [, ...])]  AS query

Note

The default action is materialize, and thus action may be omitted for creating continuous views. As long as a stream is being selected from, Tantor PipelineDB will interpret the CREATE VIEW statement with an action of materialize.

Where query is a subset of a PostgreSQL SELECT statement:

SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ]
    expression [ [ AS ] output_name ] [, ...]
    [ FROM from_item [, ...] ]
    [ WHERE condition ]
    [ GROUP BY expression [, ...] ]

Where, in turn:

  • from_item can be one of:

    • :code:`stream_name [ [ AS ] alias [ ( column_alias [, …] ) ] ]

    • :code:`table_name [ [ AS ] alias [ ( column_alias [, …] ) ] ]

    • :code:`from_item [ NATURAL ] join_type from_item [ ON join_condition ]

    Note

    This section references streams, which are similar to tables and are what continuous views and transforms read from in their FROM clause. They’re explained in more depth in the Streams section, but you can think of them as append-only tables for now.

  • expression — PostgreSQL expression or grouping sets specification.

  • output_name — an optional identifier to name an expression with.

  • condition — any expression that evaluates to a result of type boolean. Any row that does not satisfy this condition will be eliminated from the output. A row satisfies the condition if it returns true when the actual row values are substituted for any variable references.

Note

This has mainly covered only the syntax for creating continuous views. To learn more about the semantics of each of these query elements, you should consult the PostgreSQL SELECT documentation.

Dropping Continuous Views

To DROP a continuous view from the system, use the DROP VIEW command. Its syntax is simple:

DROP VIEW name

This will remove the continuous view from the system along with all of its associated resources.

Truncating Continuous Views

To remove all of a continuous view’s data without removing the continuous view itself, the truncate_continuous_view function can be used:

SELECT truncate_continuous_view('name');

This command will efficiently remove all of the continuous view’s rows, and is therefore analogous to PostgreSQL TRUNCATE command.

Viewing Continuous Views

To view the continuous views and their definitions currently in the system, you can run the following query:

SELECT * FROM pipelinedb.views;

Data Retrieval

Since continuous views are a lot like regular views, retrieving data from them is simply a matter of performing a SELECT on them:

SELECT * FROM some_continuous_view

user

event_count

a

10

b

20

c

30

Any SELECT statement is valid on a continuous view, allowing you to perform further analysis on their perpetually updating contents:

SELECT t.name, sum(v.value) + sum(t.table_value) AS total
FROM some_continuous_view v JOIN some_table t ON v.id = t.id GROUP BY t.name

name

total

usman

10

jeff

20

derek

30

Time-to-Live (TTL) Expiration

A common Tantor PipelineDB pattern is to include a time-based column in aggregate groupings and removing old rows that are no longer needed, as determined by that column. While there are a number of ways to achieve this behavior, Tantor PipelineDB provides native support for row expiration via time-to-live (TTL) criteria specified at the continuous view level.

TTL expiration behavior can be assigned to continuous views via the ttl and ttl_column storage parameters. Expiration is handled by one or more “reaper” processes that will DELETE any rows having a ttl_column value that is older than the interval specified by ttl (relative to wall time). Here’s an example of a continuous view definition that will tell the reaper to delete any rows whose minute column is older than one month:

CREATE VIEW v_ttl WITH (ttl = '1 month', ttl_column = 'minute') AS
  SELECT minute(arrival_timestamp), COUNT(*) FROM some_stream GROUP BY minute;

Note that TTL behavior is a hint to the reaper, and thus will not guarantee that rows will be physically deleted exactly when they are expired.

If you’d like to guarantee that no TTL-expired rows will be read, you should create a view over the continuous view with a WHERE clause that excludes expired rows at read time.

Modifying TTLs

TTLs can be added, modified, and removed from continuous views via the pipelinedb.set_ttl function:

pipelinedb.set_ttl ( cv_name, ttl, ttl_column )

Update the given continuous view’s TTL with the given parameters. ttl is an interval expressed as a string (e.g. '1 day'), and ttl_column is the name of a timestamp-based column.

Passing NULL for both the ttl and ttl_column parameters will effectively remove a TTL from the given continuous view. Note that a TTL cannot be modified on or removed from a sliding-window continuous view.

Activation and Deactivation

Because continuous views are continuously processing input streams, it can be useful to have a notion of starting and stopping that processing without having to completely shutdown Tantor PipelineDB. For example, if a continuous view incurs an unexpected amount of system load or begins throwing errors, it may be useful to temporarily stop continuous processing for that view (or all of them) until the issue is resolved.

This level of control is provided by the activate and deactivate functions, which are synonymous with “play” and “pause”. When continuous views are active, they are actively reading from their input streams and incrementally updating their results accordingly. Conversely, inactive continuous views are not reading from their input streams and are not updating their results. Tantor PipelineDB remains functional when continuous views are inactive, and continuous views themselves are still readable — they’re just not updating.

The function signatures take only a continuous view or transform name:

SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');

Continuous Transforms can also be activated and deactivated.

Important

When continuous queries (views or transforms) are inactive, any events written to their input streams while they’re inactive will never be read by that continuous query, even after they’re activated again.

See Operational Functions for more information.

Partitioning

Tantor PipelineDB supports partitioning of continuous views. Similar to regular partitioned tables in PostgreSQL, partitioned continuous views allow splitting one large view into smaller physical pieces to reduce the update overhead, improve query performance and simplify removing data that is no longer needed.

Tantor PipelineDB automatically creates partitions as necessary based on the actual data values coming from streams.

Only a subset of PostgreSQL partitioning features is supported. Continuous views can be partitioned into ranges over a timestamp-based column using fixed-size time intervals. Here’s the syntax for creating a partitioned continuous view:

CREATE VIEW name WITH (partition_by = column, partition_duration = 'interval') AS query;

Where partition_by must refer to either a timestamp or timestamptz column that must be one of the output columns of query, and partition_duration is a partition range specification in the PostgresQL interval data type format.

For example, the following definition creates a continuous view partitioned over the ts column where each partition corresponds to a one month interval:

CREATE VIEW v_part WITH (partition_by = ts, partition_duration = '1 month')
  AS SELECT ts, COUNT(*) FROM stream GROUP BY ts;

Unlike PostgreSQL range-partitioned tables, continuous views currently do not support manual specification of range bounds. Tantor PipelineDB automatically assigns partition range bounds as follows:

  • For some common partition intervals, some special casing is performed so that users get predictable and expected partition bounds. For example, with a 1 week partition interval users will generally expect each weekly partition to start/end at the beginning/end of each calendar week. This also provides sane performance expectations, as users will generally want one calendar week to span at most one partition. Below is a list of all supported special cases:

    • 1 year

    • 1 month

    • 1 week

    • 1 day

    • 1 hour

    • 1 minute

    • 1 second

    For example, for a row with a timestamp of '2024-04-11 13:53:07.82049', Tantor PipelineDB will choose (and automatically create, if necessary) a partition with bounds ['2024-01-01', '2025-01-01'), if partition_duration is ‘1 year’, or ['2024-04-01', '2024-05-01'), if partition_duration is ‘1 month’, and so on.

  • For other intervals, range bounds are chosen automatically by rounding to the nearest lower and upper timestamps of the specified granularity. For example, with a partitioning interval of ‘3 minutes’ and a timestamp value of 2024-04-11 13:53:07.82049, Tantor PipelineDB will choose a partition with range bounds corresponding to ['2024-04-11 13:51:00', '2024-04-11 13:54:00') and create it, if it doesn’t exist yet.

Continuous Transforms cannot be partitioned as they do not store any data.

Alternative Access Methods

PostgreSQL supports alternative mechanisms to store data on disk called table access methods. Tantor PipelineDB uses the default “heap” access method to create on-disk tables and partitions for materialized views, but after their creation users are able to convert them to other access methods. For example, the user may want to store historic, read-only data in certain partitions in a more space-efficient format provided by the Columnar extension that is also more suitable for analytical queries.

The following restrictions apply:

  • Updating any on-disk data stored in an access method other than “heap” is currently not supported by Tantor PipelineDB. If a combiner process encounters a non-heap table that must be updated with incoming stream data, it throws the "unsupported access method for relation" error and rolls back the current transaction, discarding incoming data in the current batch. What it means in practice is that only partitions with old, historic, read-only data can be safely converted to alternative access methods such as Columnar. Updates to such partitions will be lost, possibly along with updates to regular heap tables in the same batch.

  • Care must be taken to convert partitions in an atomic, transaction-safe way to avoid disrupting updates to the parent continuous view. This can be done either manually or with specialized tools like pg_archive. The manual procedures involves the following steps:

    • START TRANSACTION

    • LOCK TABLE ... IN EXCLUSIVE MODE for the target partition

    • create a new table with the same schema and a new access method

    • copy data from the old partition into the new table using, for example, INSERT INTO ... SELECT * FROM ...

    • detach the old partition into a standalone table using ALTER TABLE ... DETACH PARTITION ... CONCURRENTLY

    • optionally, drop the detached partition using DROP TABLE if you don’t want to keep it for backup purposes

    • rename the new table to have the same name as the old one

    • re-attach the new table in place of the old partition using ALTER TABLE ... ATTACH PARTITION

    • COMMIT

Examples

Putting this all together, let’s go through a few examples of continuous views and understand what each one accomplishes.

Important

It is important to understand that the only data persisted by Tantor PipelineDB for a continuous view is whatever would be returned by running a SELECT * FROM my_cv on it (plus a small amount of metadata). This is a relatively new concept, but it is at the core of what makes continuous views so powerful!

Emphasizing the above notice, this continuous view would only ever store a single row in Tantor PipelineDB (just a few bytes), even if it read a trillion events over time:

CREATE VIEW avg_of_forever AS SELECT AVG(x) FROM one_trillion_events_stream;
  • Calculate the number of unique users seen per url referrer each day using only a constant amount of space per day:

    CREATE VIEW uniques AS
    SELECT date_trunc('day', arrival_timestamp) AS day,
      referrer, COUNT(DISTINCT user_id)
    FROM users_stream GROUP BY day, referrer;
    
  • Compute the linear regression of a stream of datapoints bucketed by minute:

    CREATE VIEW lreg AS
    SELECT date_trunc('minute', arrival_timestamp) AS minute,
      regr_slope(y, x) AS mx,
      regr_intercept(y, x) AS b
    FROM datapoints_stream GROUP BY minute;
    
  • How many ad impressions have we served in the last five minutes?

    CREATE VIEW imps AS
      SELECT COUNT(*) FROM imps_stream
    WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');
    
  • What are the 90th, 95th, and 99th percentiles of my server’s request latency?

    CREATE VIEW latency AS
      SELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)
    FROM latency_stream;