Потоки

Потоки — это абстракция, которая позволяет клиентам передавать данные через непрерывные представления. Строка потока, или просто событие, выглядит точно так же, как обычная строка таблицы, и интерфейс записи данных в потоки идентичен интерфейсу записи в таблицы. Однако семантика потоков фундаментально отличается от семантики таблиц.

А именно, события существуют в потоке только до тех пор, пока они не будут обработаны всеми непрерывными представлениями, которые читают из этого потока. Даже тогда пользователям все еще невозможно сделать SELECT из потоков. Потоки служат исключительно в качестве входных данных для непрерывных представлений.

Потоки представлены в Tantor PipelineDB как внешние таблицы, управляемые внешним сервером pipelinedb. Синтаксис создания потока аналогичен созданию таблицы:

CREATE FOREIGN TABLE stream_name ( [
   { column_name data_type [ COLLATE collation ] } [, ... ]
] )
SERVER pipelinedb;

stream_name

Имя потока, который нужно создать.

column_name

Имя столбца, который будет создан в новой таблице.

data_type

Тип данных столбца. Может включать спецификаторы массива. Подробнее о типах данных, поддерживаемых Tantor PipelineDB, смотрите в разделах Встроенные функции и PostgreSQL supported types.

COLLATE collation

Предложение COLLATE назначает правила сортировки для столбца, который должен иметь тип данных, поддерживающий сортировку. Если не указано иное, используется правило сортировки по умолчанию для соответствующих типов данных столбцов.

Столбцы могут быть добавлены в потоки с помощью ALTER STREAM:

postgres=# ALTER FOREIGN TABLE stream ADD COLUMN x integer;
ALTER FOREIGN TABLE

Примечание

Столбцы не могут быть удалены из потоков.

Потоки могут быть удалены с помощью команды DROP FOREIGN TABLE. Ниже приведен пример создания простого непрерывного представления, которое берет данные из потока.

postgres=# CREATE FOREIGN TABLE stream (x integer, y integer) SERVER pipelinedb;
CREATE FOREIGN TABLE
postgres=# CREATE VIEW v AS SELECT sum(x + y) FROM stream;
CREATE VIEW

Запись в потоки

INSERT

Для записи в потоки используется упрощенная версия оператора PostgreSQL INSERT. Cинтаксис:

INSERT INTO stream_name ( column_name [, ...] )
  { VALUES ( expression [, ...] ) [, ...] | query }

Где query это запрос SELECT.

Рассмотрим несколько примеров.

Записи в потоки можно делать отдельно:

INSERT INTO stream (x, y, z) VALUES (0, 1, 2);
INSERT INTO json_stream (payload) VALUES (
  '{"key": "value", "arr": [92, 12, 100, 200], "obj": { "nested": "value" } }'
);

Или их можно сгруппировать для улучшения производительности:

INSERT INTO stream (x, y, z) VALUES (0, 1, 2), (3, 4, 5), (6, 7, 8)
(9, 10, 11), (12, 13, 14), (15, 16, 17), (18, 19, 20), (21, 22, 23), (24, 25, 26);

Вставки в потоки также могут содержать произвольные выражения:

INSERT INTO geo_stream (id, coords) VALUES (42, a_function(-72.09, 41.40));

INSERT INTO udf_stream (result) VALUES (my_user_defined_function('foo'));

INSERT INTO str_stream (encoded, location) VALUES
  (encode('encode me', 'base64'), position('needle' in 'haystack'));

INSERT INTO rad_stream (circle, sphere) VALUES
  (pi() * pow(11.2, 2), 4 / 3 * pi() * pow(11.2, 3));

-- Также поддерживаются подзапросы в потоки
INSERT INTO ss_stream (x) SELECT generate_series(1, 10) AS x;

INSERT INTO tab_stream (x) SELECT x FROM some_table;

Подготовленный INSERT

Записи в поток также работают с подготовленными вставками для снижения сетевой нагрузки:

PREPARE write_to_stream AS INSERT INTO stream (x, y, z) VALUES ($1, $2, $3);
EXECUTE write_to_stream(0, 1, 2);
EXECUTE write_to_stream(3, 4, 5);
EXECUTE write_to_stream(6, 7, 8);

COPY

Также можно использовать PostgreSQLs COPY для записи данных из файла в поток:

COPY stream (data) FROM '/some/file.csv'

Команда COPY может быть очень полезна для заполнения непрерывного представления из архивных данных задним числом. Вот так можно передавать сжатые архивные данные из S3 в Tantor PipelineDB:

aws s3 cp s3://bucket/logfile.gz - | gunzip | pipeline -c "COPY stream (data) FROM STDIN"

Другие клиенты

Так как Tantor PipelineDB совместим с PostgreSQL, то запись в потоки возможна из любого клиента, который работает с PostgreSQL (и, вероятно, из большинства клиентов, работающих с любой базой данных SQL), поэтому нет необходимости вручную создавать вставки в потоки. Для получения общего представления о том, как это работает, рекомендуем ознакомиться с разделом Клиенты.

Выходные потоки

Выходные потоки позволяют читать из потока инкрементальных изменений, внесенных в любом непрерывном представлении или из строк, выбранных непрерывным преобразованием. И так как выходные потоки это обычные потоки Tantor PipelineDB, они могут быть прочитаны другими непрерывными представлениями или преобразованиями. К ним можно получить доступ с помощью функции output_of, вызванной на непрерывном представлении или преобразовании.

Для непрерывных представлений каждая строка в выходном потоке всегда содержит старый и новый кортеж, отражающий изменение, внесенное в основное непрерывное представление. Если изменение соответствует вставке в непрерывное представление, старый кортеж будет NULL. Если изменение соответствует удалению (в настоящее время это возможно только при выходе кортежа скользящего окна за пределы окна), новый кортеж будет NULL.

Давайте рассмотрим простой пример, чтобы проиллюстрировать некоторые из этих концепций на практике. Рассмотрим обычное непрерывное представление, которое просто суммирует данные одного столбца потока:

CREATE VIEW v_sum AS SELECT sum(x) FROM stream;

Теперь представьте себе ситуацию, что необходимо регистрировать каждое изменение суммы более чем на 10. Можно создать другое непрерывное представление, которое читает выходной поток v_sums, чтобы выполнить эту задачу:

CREATE VIEW v_deltas AS SELECT abs((new).sum - (old).sum) AS delta
  FROM output_of('v_sum')
  WHERE abs((new).sum - (old).sum) > 10;

Примечание

Старые и новые кортежи должны быть заключены в круглые скобки.

Ознакомьтесь с разделом Выходные потоки непрерывного преобразования для получения дополнительной информации о выходных потоках на непрерывных преобразованиях.

Выходные потоки на скользящих окнах

Для непрерывных представлений без скользящих окон в выходные потоки делается запись всякий раз, когда запись в поток приводит к изменению результата непрерывных представлений. Однако, поскольку результаты непрерывных представлений со скользящими окнами также зависят от времени, в их выходные потоки автоматически делается запись по мере изменения результатов с течением времени. То есть выходные потоки непрерывных представлений со скользящими окнами будут получать записи даже в том случае, если во входные потоки запись не делается.

Дельта-потоки

Помимо старых и новых кортежей, записываемых в выходной поток непрерывных представлений, для каждого инкрементального изменения, внесенного в непрерывное представление, также генерируется дельта-кортеж. Дельта-кортеж содержит значение, представляющее «разницу» между старыми и новыми кортежами. Для простых агрегатных функций, таких как sum, разница между старым и новым значением просто является скалярным значением (new).sum - (old).sum, подобно ручному подсчету в приведенном выше примере.

Давайте посмотрим, как это выглядит на самом деле:

postgres=# CREATE VIEW v AS SELECT COUNT(*) FROM stream;
CREATE VIEW
postgres=# CREATE VIEW v_real_deltas AS SELECT (delta).sum FROM output_of('v');
CREATE VIEW
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SELECT * FROM v_real_deltas;
sum
-----
   1
(1 row)
postgres=# INSERT INTO stream (x) VALUES (2);
INSERT 0 1
postgres=# INSERT INTO stream (x) VALUES (3);
INSERT 0 1
postgres=# SELECT * FROM v_real_deltas;
sum
-----
   1
   2
   3
(3 rows)

Как видите, v_real_deltas записывает изменения, возникающие в результате каждой вставки. Но sum — очень простой пример. Магия дельта-потоков заключается в том, что они работают для всех агрегатных функций и могут быть использованы в сочетании с операцией combine для эффективной агрегации вывода непрерывных представлений на разных уровнях детализации или группировки.

Давайте рассмотрим более интересный пример. Предположим, у нас есть непрерывное представление, считающее количество уникальных пользователей в минуту:

CREATE VIEW uniques_1m AS
  SELECT minute(arrival_timestamp) AS ts, COUNT(DISTINCT user_id) AS uniques
FROM s GROUP BY ts;

Для целей архивирования и повышения производительности может понадобиться уменьшить агрегацию этого непрерывного представления до детализации по часам после определенного периода времени. С агрегатной функцией, типа COUNT(DISTINCT), очевидно, что нельзя просто сложить полученные результаты за все минуты в часе, потому что будут дублирующиеся уникальные значения между исходными границами минут. Вместо этого можно объединить операцией combine уникальные дельта-значения, полученные из вывода непрерывного представления на уровне минуты:

CREATE VIEW uniques_hourly AS
  SELECT hour((new).ts) AS ts, combine((delta).uniques) AS uniques
FROM output_of('uniques_1m') GROUP BY ts;

Непрерывное представление uniques_hourly теперь будет содержать почасовые уникальные строки, которые содержат точно такую же информацию, как если бы все исходные необработанные значения были агрегированы по часам. Но вместо дублирования работы, выполненной при чтении необработанных событий, дополнительно агрегируется вывод на уровне минут.

stream_targets

Иногда может потребоваться обновить только выбранный набор непрерывных представлений при записи в поток, например, при повторном воспроизведении исторических данных во вновь созданном непрерывном представлении. Можно использовать параметр конфигурации stream_targets, чтобы указать непрерывные представления, которые должны обновляться при записи в потоки. Укажите stream_targets в разделенном запятыми списке непрерывных представлений, которые необходимо обновить при вставке в потоки.

postgres=# CREATE VIEW v0 AS SELECT COUNT(*) FROM stream;
CREATE VIEW
postgres=# CREATE VIEW v1 AS SELECT COUNT(*) FROM stream;
CREATE VIEW
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SET stream_targets TO v0;
SET
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SET stream_targets TO DEFAULT;
SET
postgres=# INSERT INTO stream (x) VALUES (1);
INSERT 0 1
postgres=# SELECT count FROM v0;
 count
-------
     3
(1 row)

postgres=# SELECT count FROM v1;
 count
-------
     2
(1 row)

Порядок поступления

По замыслу, Tantor PipelineDB использует порядок поступления arrival ordering для упорядочивания событий. Это означает, что отмечается время при поступлении события на сервер Tantor PipelineDB и событие получает дополнительный атрибут arrival_timestamp, содержащий эту временную метку. arrival_timestamp затем может быть использован в непрерывных представлениях с временным компонентом, типа скользящего окна.

Истечение срока события

После того, как событие поступает на сервер Tantor PipelineDB, ему присваивается небольшой битовый массив, представляющий все непрерывные представления, которые все еще должны прочитать событие. Когда непрерывное представление CONTINUOUS VIEW заканчивает чтение события, оно инвертирует один бит в битовом массиве. Когда все биты в битовой карте установлены в 1, событие удаляется и больше недоступно.