Потоки

Потоки - это абстракция, которая позволяет клиентам передавать данные через непрерывные представления. Строка потока, или просто событие, выглядит точно так же, как обычная строка таблицы, и интерфейс записи данных в потоки идентичен интерфейсу записи в таблицы. Однако семантика потоков фундаментально отличается от семантики таблиц.

А именно, события существуют в потоке только до тех пор, пока они не будут обработаны всеми непрерывными представлениями, которые читают из этого потока. Даже тогда пользователям все еще невозможно сделать SELECT из потоков. Потоки служат исключительно в качестве входных данных для непрерывных представлений.

Синтаксис создания потока аналогичен созданию таблицы:

CREATE STREAM stream_name ( [
        { column_name data_type [ COLLATE collation ] | LIKE parent_stream } [, ... ]
] )

stream_name

Имя потока, который нужно создать.

column_name

Имя столбца, который будет создан в новой таблице.

data_type

Тип данных столбца. Может включать спецификаторы массива. Для получения дополнительной информации о типах данных, поддерживаемых PipelineDB, см. Встроенные функции и PostgreSQL supported types .

COLLATE collation

Предложение COLLATE назначает правила сортировки для столбца (который должен иметь тип данных, поддерживающий сортировку). Если не указано иное, используется правило сортировки по умолчанию для соответствующих типов данных столбцов.

LIKE parent_table [ like_option … ]

Предложение LIKE указывает поток, из которого новый поток автоматически копирует все имена столбцов и типы данных.

Столбцы могут быть добавлены в потоки с помощью ALTER STREAM:

pipeline=# ALTER STREAM stream ADD COLUMN x integer; ALTER STREAM

Примечание

Столбцы не могут быть удалены из потоков.

Потоки могут быть удалены с помощью команды DROP STREAM. Ниже приведен пример создания простого непрерывного представления, которое берет данные из потока.

pipeline=# CREATE STREAM stream (x integer, y integer); CREATE STREAM pipeline=# CREATE CONTINUOUS VIEW v AS SELECT sum(x + y) FROM stream; CREATE CONTINUOUS VIEW

Запись в потоки

INSERT

Для записи в потоки используется упрощенная версия оператора PostgreSQL INSERT . Cинтаксис:

INSERT INTO stream_name ( column_name [, ...] ) VALUES ( expression [, ...] ) [, ...]

Рассмотрим несколько примеров.

Записи в потоки можно делать отдельно:

INSERT INTO stream (x, y, z) VALUES (0, 1, 2);

INSERT INTO json_stream (payload) VALUES (
  '{"key": "value", "arr": [92, 12, 100, 200], "obj": { "nested": "value" } }'
);

Или их можно сгруппировать для улучшения производительности:

INSERT INTO stream (x, y, z) VALUES (0, 1, 2), (3, 4, 5), (6, 7, 8)
(9, 10, 11), (12, 13, 14), (15, 16, 17), (18, 19, 20), (21, 22, 23), (24, 25, 26);

Вставки в потоки также могут содержать произвольные выражения:

INSERT INTO geo_stream (id, coords) VALUES (42, ST_MakePoint(-72.09, 41.40));

INSERT INTO udf_stream (result) VALUES (my_user_defined_function('foo'));

INSERT INTO str_stream (encoded, location) VALUES
  (encode('encode me', 'base64'), position('needle' in 'haystack'));

INSERT INTO rad_stream (circle, sphere) VALUES
  (pi() * pow(11.2, 2), 4 / 3 * pi() * pow(11.2, 3));

-- Также поддерживаются подзапросы в потоки
INSERT INTO ss_stream (x) SELECT generate_series(1, 10) AS x;

INSERT INTO tab_stream (x) SELECT x FROM some_table;

Подготовленный INSERT

Записи в поток также работают с подготовленными вставками для снижения сетевой нагрузки:

PREPARE write_to_stream AS INSERT INTO stream (x, y, z) VALUES ($1, $2, $3);

EXECUTE write_to_stream(0, 1, 2);
EXECUTE write_to_stream(3, 4, 5);
EXECUTE write_to_stream(6, 7, 8);

COPY

И также можно использовать COPY для записи данных из файла в поток:

COPY stream (data) FROM '/some/file.csv'

Команда COPY может быть очень полезна для заполнения непрерывного представления из архивных данных задним числом. Вот как можно передавать сжатые архивные данные из S3 в PipelineDB:

s3cmd get s3://bucket/logfile.gz - | gunzip | pipeline -c "COPY stream (data) FROM STDIN"

Другие клиенты

Поскольку PipelineDB совместим с PostgreSQL, запись в потоки возможна из любого клиента, который работает с PostgreSQL (и, вероятно, из большинства клиентов, работающих с любой базой данных SQL), поэтому нет необходимости вручную создавать вставки в потоки. Для получения общего представления о том, как это работает, рекомендуем ознакомиться с разделом Клиенты.

Выходные потоки

Добавлено в версии 0.9.5.

Выходные потоки позволяют читать из потока инкрементальных изменений, внесенных в любом непрерывном представлении или из строк, выбранных непрерывным преобразованием. Выходные потоки это обычные потоки PipelineDB и, как таковые, могут быть прочитаны другими непрерывными представлениями или преобразованиями. К ним можно получить доступ с помощью функции output_of, вызванной на непрерывном представлении или преобразовании.

Для непрерывных представлений каждая строка в выходном потоке всегда содержит старый и новый кортеж, отражающий изменение, внесенное в основное непрерывное представление. Если изменение соответствует вставке в непрерывное представление, старый кортеж будет NULL. Если изменение соответствует удалению (в настоящее время это возможно только при выходе кортежа скользящего окна за пределы окна), новый кортеж будет NULL.

Давайте рассмотрим простой пример, чтобы проиллюстрировать некоторые из этих концепций на практике. Рассмотрим обычное непрерывное представление, которое просто суммирует данные одного столбца потока:

CREATE CONTINUOUS VIEW v_sum AS SELECT sum(x) FROM stream;

Теперь представьте себе ситуацию, когда мы хотим регистрировать каждое изменение суммы более чем на 10. Мы можем создать другое непрерывное представление, которое читает выходной поток v_sums, чтобы выполнить эту задачу:

CREATE CONTINUOUS VIEW v_deltas AS SELECT abs((new).sum - (old).sum) AS delta FROM output_of('v_sum') WHERE abs((new).sum - (old).sum) > 10;

… note:: старые и новые кортежи должны быть заключены в круглые скобки

Ознакомьтесь с разделом :ref`:ct-output-streams` для получения дополнительной информации о выходных потоках на непрерывных преобразованиях.

Выходные потоки на скользящих окнах

Для непрерывных представлений без скользящих окон в выходные потоки делается запись всякий раз, когда запись в поток приводит к изменению результата непрерывных представлений. Однако, поскольку результаты непрерывных представлений со скользящими окнами также зависят от времени, в их выходные потоки автоматически делается запись по мере изменения результатов с течением времени. То есть выходные потоки непрерывных представлений со скользящими окнами будут получать записи даже в том случае, если во входные потоки запись не делается.

Дельта-потоки

Добавлено в версии 0.9.7.

Помимо старых и новых кортежей, записываемых в выходной поток непрерывных представлений, для каждого инкрементального изменения, внесенного в непрерывное представление, также генерируется дельта-кортеж. Дельта-кортеж содержит значение, представляющее «разницу» между старыми и новыми кортежами. Для простых агрегатов, таких как sum, разница между старым и новым значением просто является скалярным значением (new).sum - (old).sum, подобно ручному подсчету в приведенном выше примере.

Давайте посмотрим, как это выглядит на самом деле:

pipeline=# CREATE CONTINUOUS VIEW v AS SELECT COUNT(\*) FROM stream; CREATE CONTINUOUS VIEW pipeline=# CREATE CONTINUOUS VIEW v_real_deltas AS SELECT (delta).sum FROM output_of('v'); CREATE CONTINUOUS VIEW pipeline=# INSERT INTO stream (x) VALUES (1); INSERT 0 1 pipeline=# SELECT * FROM v_real_deltas; sum
-----
   1
(1 row) pipeline=# INSERT INTO stream (x) VALUES (2); INSERT 0 1 pipeline=# INSERT INTO stream (x) VALUES (3); INSERT 0 1 pipeline=# SELECT * FROM v_real_deltas; sum
-----
   1
   2
   3
(3 rows)

Как видите, v_real_deltas записывает изменения, возникающие в результате каждой вставки. Но sum - очень простой пример. Магия дельта-потоков заключается в том, что они работают для всех агрегатов и могут быть использованы в сочетании с операцией combine для эффективной агрегации вывода непрерывных представлений на разных уровнях детализации/группировки.

Давайте рассмотрим более интересный пример. Предположим, у нас есть непрерывное представление, считающее количество уникальных пользователей в минуту:

CREATE CONTINUOUS VIEW uniques_1m AS SELECT minute(arrival_timestamp) AS ts, COUNT(DISTINCT user_id) AS uniques FROM s GROUP BY ts;

Для целей архивирования и повышения производительности нам может понадобиться уменьшить агрегацию этого непрерывного представления до детализации по часам после определенного периода времени. С агрегатом, типа COUNT(DISTINCT), очевидно, что мы не можем просто сложить полученные результаты за все минуты в часе, потому что будут дублирующиеся уникальные значения между исходными границами минут. Вместо этого мы можем объединить операцией combine уникальные дельта-значения, полученные из вывода непрерывного представления на уровне минуты:

CREATE CONTINUOUS VIEW uniques_hourly AS SELECT hour((new).ts) AS ts, combine((delta).uniques) AS uniques FROM output_of('uniques_1m') GROUP BY ts;

Непрерывное представление uniques_hourly теперь будет содержать почасовые уникальные строки, которые содержат точно такую же информацию, как если бы все исходные необработанные значения были агрегированы по часам. Но вместо дублирования работы, выполненной при чтении необработанных событий, мы просто дополнительно агрегируем вывод на уровне минут.

stream_targets

Иногда может потребоваться обновить только выбранный набор непрерывных представлений при записи в поток, например, при повторном воспроизведении исторических данных во вновь созданном непрерывном представлении. Можно использовать параметр конфигурации stream_targets, чтобы указать непрерывные представления, которые должны обновляться при записи в потоки. Укажите stream_targets в разделенном запятыми списке непрерывных представлений, которые необходимо обновить при вставке в потоки.

pipeline=# CREATE CONTINUOUS VIEW v0 AS SELECT COUNT(*) FROM stream; CREATE CONTINUOUS VIEW pipeline=# CREATE CONTINUOUS VIEW v1 AS SELECT COUNT(*) FROM stream; CREATE CONTINUOUS VIEW pipeline=# INSERT INTO stream (x) VALUES (1); INSERT 0 1 pipeline=# SET stream_targets TO v0; SET pipeline=# INSERT INTO stream (x) VALUES (1); INSERT 0 1 pipeline=# SET stream_targets TO DEFAULT; SET pipeline=# INSERT INTO stream (x) VALUES (1); INSERT 0 1 pipeline=# SELECT count FROM v0; count
-------
     3
(1 row)

pipeline=# SELECT count FROM v1; count
-------
     2
(1 row)

pipeline=#

Порядок поступления

По замыслу, PipelineDB использует порядок поступления arrival ordering для упорядочивания событий. Это означает, что отмечается время при поступлении события на сервер PipelineDB и событие получает дополнительный атрибут arrival_timestamp, содержащий эту временную метку. arrival_timestamp затем может быть использован в непрерывных представлениях Непрерывные представления с временным компонентом, типа скользящего окна Скользящие окна.

Истечение срока события

После того, как событие поступает на сервер PipelineDB, ему присваивается небольшой битовый массив, представляющий все непрерывные представления Непрерывные представления, которые все еще должны прочитать событие. Когда непрерывное представление CONTINUOUS VIEW заканчивает чтение события, оно инвертирует один бит в битовом массиве. Когда все биты в битовой карте установлены в 1, событие удаляется и больше недоступно.


Теперь, когда вы знаете, что такое непрерывные представления Непрерывные представления и как писать в потоки, пришло время узнать об обширных встроенных функциях PipelineDB!