Непрерывные преобразования
Непрерывные преобразования могут использоваться для непрерывного преобразования входящих данных без их сохранения. Поскольку данные не сохраняются, непрерывные преобразования не поддерживают агрегатные функции. Результат преобразования может быть направлен в другой поток или записан во внешнее хранилище данных.
Создание непрерывных преобразований
Преобразования создаются как представления PostgreSQL с параметром action=transform
.Ниже приведен синтаксис создания непрерывного преобразования:
CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query
Где query
является подмножеством оператора SELECT
PostgreSQL:
SELECT expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
Где, в свою очередь:
expression
в операторе SELECT не может содержать агрегатные функции.from_item
может быть одним из представленных ниже вариантов:stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
function_name
— это необязательная пользовательская функция, которая не принимает аргументы и возвращает типtrigger
. Она выполняется для каждой строки, выводимой непрерывным преобразованием.arguments
— это необязательный список аргументов, разделенных запятыми, которые требуются функции при выполнении триггера. Аргументы могут быть только текстовыми строковыми константами.
Примечание
Можно рассматривать непрерывные преобразования как триггеры для поступающих потоков данных, где функция триггера выполняется для каждой новой строки, выводимой непрерывным преобразованием. Внутренне функция выполняется как триггер AFTER INSERT FOR EACH ROW
, поэтому нет строки OLD
, а строка NEW
содержит строку, выводимую непрерывным преобразованием.
Удаление непрерывных преобразований
Чтобы удалить (DROP
) непрерывное преобразование из системы, используйте команду DROP VIEW
. Его синтаксис прост:
DROP VIEW continuous_transform;
Это удалит непрерывное преобразование из системы вместе со всеми связанными ресурсами.
Просмотр непрерывных преобразований
Чтобы просмотреть непрерывные преобразования, находящиеся в системе, выполните следующий запрос:
SELECT * FROM pipelinedb.transforms;
Выходные потоки непрерывного преобразования
У всех непрерывных преобразований есть связанные с ними выходные потоки, что облегчает чтение из них другими преобразованиям или непрерывными представлениями. Выходной поток непрерывного преобразования просто содержит строки, которые выбирает преобразование.
Например, вот простое преобразование, которое соединяет входные строки с таблицей:
CREATE VIEW t WITH (action=transform) AS
SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
Это преобразование будет записывать значения из соединенной таблицы в свой выходной поток, который можно прочитать с помощью output_of
:
CREATE VIEW v WITH (action=materialize) AS
SELECT sum(y) FROM output_of('t');
Встроенные триггеры преобразования
Для обеспечения большей гибкости при выводе непрерывных преобразований, относительно встроенных выходных потоков, Tantor PipelineDB предоставляет интерфейс для получения строк преобразований с использованием триггера. Триггеры в связке с преобразованиями, могут выполнять любые операции с полученными ими строками, включая запись в другие потоки.
В настоящее время Tantor PipelineDB предоставляет только один встроенный триггер pipelinedb.insert_into_stream
, который можно использовать с непрерывными преобразованиями. Он вставляет вывод непрерывного преобразования во все потоки, которые предоставлены как аргументы строковых литералов. Например:
CREATE VIEW t WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('even_stream)) AS
SELECT x, y FROM stream WHERE mod(x, 2) = 0;
Это непрерывное преобразование вставит все значения (x, y)
в поток even_stream
, где x
— четное.
Важно
Все аргументы для pipelinedb.insert_into_stream
должны быть допустимыми названиями потоков, которые уже существуют в системе, в противном возникнет ошибка.
Создание собственного триггера
Вы также можете создать собственный триггер, который будет использоваться с непрерывными преобразованиями. Например, если необходимо вставить вывод в таблицу, можно написать так:
CREATE TABLE t (user text, value int);
CREATE OR REPLACE FUNCTION insert_into_t()
RETURNS trigger AS
$$
BEGIN
INSERT INTO t (user, value) VALUES (NEW.user, NEW.value);
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
CREATE VIEW ct WITH (action=transform, outputfunc=insert_into_t) AS
SELECT user::text, value::int FROM stream WHERE value > 100;