Continuous Transforms
Continuous transforms can be used to continuously transform incoming time-series data without storing it. Since no data is stored, continuous transforms don’t support aggregations. The result of the transformation can be piped into another stream or written to an external data store.
Creating Continuous Transforms
Transforms are defined as PostgreSQL views with the action=transform
parameter. Here’s the syntax for creating a continuous transform:
CREATE VIEW name WITH (action=transform [, outputfunc=function_name( arguments ) ]) AS query
Where query
is a subset of a PostgreSQL SELECT
statement:
SELECT expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
Where, in turn:
expression
in the SELECT statement can’t contain an aggregate.from_item
can be one of:function_name
— is an optional user-supplied function that is declared as taking no arguments and returning typetrigger
, which is executed for every single row that is output by the continuous transform.arguments
— is an optional comma-separated list of arguments to be provided to the function when the trigger is executed. Arguments can only be literal string constants.
Note
You can think of continuous transforms as being triggers on top of incoming streaming data where the trigger function is executed for each new row output by the continuous transform. Internally the function is executed as an AFTER INSERT FOR EACH ROW
trigger so there is no OLD
row and the NEW
row contains the row output by the continuous transform.
Dropping Continuous Transforms
To DROP
a continuous transform from the system, use the DROP VIEW
command. Its syntax is simple:
DROP VIEW continuous_transform;
This will remove the continuous transform from the system along with all of its associated resources.
Viewing Continuous Transforms
To view the continuous transforms and their definitions currently in the system, you can run the following query:
SELECT * FROM pipelinedb.transforms;
Continuous Transform Output Streams
All continuous transforms have Output Streams associated with them, making it easy for other transforms or continuous views to read from them. A continuous transform’s output stream simply contains whatever rows the transform selects.
For example, here’s a simple transform that joins incoming rows with a table:
CREATE VIEW t WITH (action=transform) AS
SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
This transform now writes values from the joined table out to its output stream, which can be read using output_of
:
CREATE VIEW v WITH (action=materialize) AS
SELECT sum(y) FROM output_of('t');
Built-in Transform Output Functions
In order to provide more flexibility over a continuous transform’s output than their built-in output streams provide, Tantor PipelineDB exposes an interface to receive a transform’s rows using a trigger function. Trigger functions attached to transforms can then do whatever you’d like with the rows they receive, including write out to other streams.
Currently, Tantor PipelineDB provides only one built-in trigger function, pipelinedb.insert_into_stream
, that can be used with continuous transforms. It inserts the output of the continuous transform into all the streams that are provided as the string literal arguments. For example:
CREATE VIEW t WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('even_stream')) AS
SELECT x, y FROM stream WHERE mod(x, 2) = 0;
This continuous transform will insert all values of (x, y)
into even_stream
where x
is even.
Important
All arguments to pipelinedb.insert_into_stream
must be valid names of streams that already exist in the system, otherwise an error will be thrown.
Creating Your Own Output Function
You can also create your own output function that can be used with continuous transforms. For example, if you want to insert the output into a table, you could do something like:
CREATE TABLE t (user text, value int);
CREATE OR REPLACE FUNCTION insert_into_t()
RETURNS trigger AS
$$
BEGIN
INSERT INTO t (user, value) VALUES (NEW.user, NEW.value);
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
CREATE VIEW ct WITH (action=transform, outputfunc=insert_into_t) AS
SELECT user::text, value::int FROM stream WHERE value > 100;