Непрерывные преобразования
Добавлено в версии 0.9.0.
Непрерывные преобразования могут использоваться для непрерывного преобразования входящих данных без их сохранения. Поскольку данные не сохраняются, непрерывные преобразования не поддерживают агрегатные функции. Результат преобразования может быть направлен в другой поток или записан во внешнее хранилище данных.
CREATE CONTINUOUS TRANSFORM
Ниже приведен синтаксис создания непрерывного преобразования:
CREATE CONTINUOUS TRANSFORM name AS query [ THEN EXECUTE PROCEDURE function_name ( arguments ) ]
запрос является подмножеством оператора SELECT PostgreSQL:
SELECT expression [ [ AS ] output_name ] [, ...] [ FROM from_item\[, ...] ] [ WHERE condition ] [ GROUP BY expression [, ...] ]
где никакое выражение в операторе SELECT не может содержать агрегат, а from_item может быть одним из представленных ниже вариантов:
stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
function_name - это необязательная пользовательская функция, которая не принимает аргументы и возвращает тип trigger
, которая выполняется для каждой строки, выводимой непрерывным преобразованием.
arguments - это необязательный список аргументов, разделенных запятыми, которые требуются функции при выполнении триггера. Аргументы могут быть только текстовыми строковыми константами.
Примечание
Можно рассматривать непрерывные преобразования как триггеры для поступающих потоков данных, где функция триггера выполняется для каждой новой строки, выводимой непрерывным преобразованием. Внутренне функция выполняется как триггер AFTER INSERT FOR EACH ROW
, поэтому нет строки OLD
, а строка NEW
содержит строку, выводимую непрерывным преобразованием.
Удаление непрерывного преобразования
Чтобы удалить (DROP
) непрерывное преобразование из системы, используйте команду DROP CONTINUOUS TRANSFORM
. Его синтаксис прост:
DROP CONTINUOUS TRANSFORM name
Это удалит непрерывное преобразование из системы вместе со всеми связанными ресурсами.
Просмотр непрерывных преобразований
Чтобы просмотреть непрерывные преобразования, находящиеся в системе, выполните следующий запрос:
SELECT * FROM pipeline_transforms();
Выходные потоки непрерывного преобразования
Добавлено в версии 0.9.6.
У всех непрерывных преобразований есть связанные с ними выходные потоки Выходные потоки, что облегчает чтение из них другим преобразованиям или непрерывным представлениям. Выходные потоки непрерывного преобразования просто содержит строки, которые выбирает преобразование.
Например, вот простое преобразование, которое соединяет входные строки с таблицей:
CREATE CONTINUOUS TRANSFORM t AS SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
Это преобразование будет записывать значения из соединенной таблицы в свой выходной поток, который можно прочитать, используя output_of
:
CREATE CONTINUOUS VIEW v AS SELECT sum(y) FROM output_of('t');
Встроенные триггеры преобразования
Для обеспечения большей гибкости при выводе непрерывных преобразований, относительно встроенных выходных потоков, PipelineDB предоставляет интерфейс для получения строк преобразований с использованием триггера. Триггеры в связке с преобразованиями, могут выполнять любые операции с полученными ими строками, включая запись в другие потоки.
В настоящее время PipelineDB предоставляет только один встроенный триггер pipeline_stream_insert
, который можно использовать с непрерывными преобразованиями. Он вставляет вывод непрерывного преобразования во все потоки, которые предоставлены как аргументы строковых литералов. Например:
CREATE CONTINUOUS TRANSFORM t AS SELECT x::int, y::int FROM stream WHERE mod(x, 2) = 0 THEN EXECUTE PROCEDURE pipeline_stream_insert('even_stream');
Это непрерывное преобразование вставит все значения (x, y)
в поток even_stream
, где x
четное.
Важно
Все аргументы для pipeline_stream_insert
должны быть допустимыми названиями потоков, которые уже существуют в системе, в противном случае будет сгенерирована ошибка.
Создание собственного триггера
Вы также можете создать собственный триггер, который будет использоваться с непрерывными преобразованиями. Например, если необходимо вставить вывод в таблицу, можно написать так:
CREATE TABLE t (user text, value int);
CREATE OR REPLACE FUNCTION insert_into_t() RETURNS trigger AS $$ BEGIN INSERT INTO t (user, value) VALUES (NEW.user, NEW.value); RETURN NEW; END; $$ LANGUAGE plpgsql;
CREATE CONTINUOUS TRANSFORM ct AS SELECT user::text, value::int FROM stream WHERE value > 100 THEN EXECUTE PROCEDURE insert_into_t();