Непрерывные агрегатные функции

Одной из основных целей PipelineDB является **обеспечение высокопроизводительной непрерывной агрегации данных**и поэтому агрегатные функции являются центральным компонентом инструмента PipelineDB. Непрерывные агрегатные функции могут быть очень мощным инструментом - в самом общем смысле они позволяют поддерживать постоянное количество данных, хранимых в PipelineDB, относительно количества данных, которые были переданы через него. Это может обеспечить устойчивую и очень высокую пропускную способность данных на аппаратном обеспечения с минимальными характеристиками.

Непрерывные агрегаты инкрементально обновляются в режиме реального времени при чтении новых событий непрерывным представлением, частью которого они являются. Для простых агрегатных функций, таких как count и sum, легко посмотреть, как результаты инкрементально обновляются - просто добавьте новое значение к существующему результату.

Но для более сложных агрегатов, таких как avg, stddev, percentile_cont, и т. д., требуется более продвинутая инфраструктура для поддержки эффективных инкрементальных обновлений, и PipelineDB прозрачно обрабатывает данные по сложным сценариям.

Ниже вы найдете описание всех агрегатных функций, которые поддерживаются в PipelineDB. Некоторые из них ведут себя немного иначе, чем их стандартные аналоги, что обусловлено необходимостью оптимизировать работу с бесконечными потоками данных. К таким агрегатам идет пояснение о том, в чем именно отличается их поведение.

Примечание

Для получения информации по агрегатам, у которых есть эквиваленты в PostgreSQL и PostGIS, можно изучить документацию по агрегатам PostgreSQL_ `или `PostGIS_.


Фильтры Блума

bloom_agg ( expression )

Добавляет все входные значения в фильтр :ref:`bloom-filter

bloom_agg ( expression , p , n )

Добавляет все входные значения в фильтр Блума и устанавливает его размер в соответствии с заданными параметрами. p - желаемый процент ложного срабатывания, а n - ожидаемое количество добавляемых уникальных элементов

bloom_union_agg ( bloom filter )

Объединяет все входные фильтры Блума в один фильтр Блума, содержащий всю информацию из входных фильтров Блума.

bloom_intersection_agg ( bloom filter )

Берет пересечение всех входных фильтров Блума, и объединяет их в один фильтр Блума, содержащий только информацию, общую для всех входных фильтров Блума.

См. :ref:bloom-funcs для получения информации о функционале, который можно использовать для работы с фильтрами Блума.

Агрегаты Count-Min Sketch

cmsketch_agg ( expression )

Добавляет все входные значения в Count-Min Sketch.

cmsketch_agg ( expression, epsilon, p )

То же, что и выше, но принимает epsilon и p в качестве параметров для базового cmsketch. epsilon определяет допустимую погрешность cmsketch и по умолчанию равен 0.002 (0.2%). p определяет достоверность и по умолчанию равен 0.995 (99.5%). Уменьшение значения epsilon и p приведет к более мелким структурам cmsketch, и наоборот.

cmsketch_merge_agg ( count-min sketch )

Объединяет все входные структуры Count-min sketch в одну, содержащую всю информацию входных структур Count-min sketch.

См. Функции частоты для получения информации о функциях, которые можно использовать для работы со структурами Count-Min sketch.

Агрегаты FSS (Filtered-Space Saving)

fss_agg ( expression , k )

Добавляет все входные значения в структуру данных Filtered-Space Saving Top-K, размер которой равен заданному k, увеличивая счетчик каждого значения на 1 каждый раз, когда добавляется значение.

fss_agg_weighted (expression, k, weight )

Добавляет все входные значения в FSS, размер которого соответствует заданному k, увеличивая счетчик каждого значения на заданную единицу каждый раз, когда добавляется значение.

fss_merge_agg ( fss )

Объединяет все входные структуры FSS в одну.

См. fss-funcs для получения информации о функциях, которые можно использовать для работы с объектами FSS (Filtered-Space Saving).

Агрегаты HyperLogLog

hll_agg ( expression )

Добавляет все входные значения в HyperLogLog.

hll_agg ( expression, p )

Добавляет все входные значения в HyperLogLog с заданным p. Большее значение p уменьшает погрешность HyperLogLogs за счет увеличения размера.

hll_union_agg ( hyperloglog )

Берет все входные алгоритмы HyperLogLogs, и создает один HyperLogLog, содержащий всю информацию входных HyperLogLogs.

См. Функции HyperLogLog для получения информации о функциях, которые можно использовать для работы с объектами HyperLogLog.

Агрегаты T-Digest

tdigest_agg ( expression )

Добавляет все входные значения в T-Digest.

tidgest_merge_agg ( tdigest )

Объединяет все входные структуры T-Digest в одну, содержащую всю информацию по всем входным структурам T-Digest.

См. Функции распределения для получения информации о функциях, которые можно использовать для работы с объектами T-Digest.

Различные агрегаты

bucket_agg ( expression , bucket_id )

Добавляет 4-байтовые хеши каждого входного выражения в бакет с заданным bucket_id. Каждый хеш может быть указан только один раз в одном бакете в любой момент времени. Поэтому бакеты могут рассматриваться как уникальные наборы хешей входных выражений.

bucket_agg ( expression , bucket_id , timestamp )

То же, что и выше, но позволяет использовать выражение временной метки timestamp для определения порядка записей в бакете. То есть, только последняя запись значения будет перемещена в другой бакет.

См. :ref:`misc-funcs `для получения информации о функциях, которые можно использовать для работы с объектами bucket_agg.

exact_count_distinct ( expression )

Подсчитывает точное количество уникальных значений для заданного выражения. Поскольку count distinct, используемый в непрерывных представлениях, неявно использует HyperLogLog для оптимизации производительности, exact_count_distinct можно использовать, когда небольшая погрешность, присущая HyperLogLog, недопустима.

… important:: exact_count_distinct должен хранить все наблюдаемые уникальные значения, чтобы определить уникальность, поэтому его не рекомендуется использовать, если ожидается много уникальных значений.

first_values ( n ) WITHIN GROUP (ORDER BY sort_expression)

Агрегат с упорядоченным набором, который хранит первые n значений, упорядоченные по указанному выражению для сортировки.

Примечание

См. также: Встроенные функции PipelineDB, где разъясняются некоторые функции PipelineDB для работы с фильтрами Блум, структурами Count-min sketch, HyperLogLogs и T-Digests, не относящиеся к агрегатам. Также ознакомьтесь с Вероятностные структуры данных и алгоритмы для получения дополнительной информации о том, что это такое и как его использовать.

keyed_max ( key, value )

Возвращается значение, связанное с «самым высоким» ключом.

keyed_min ( key, value )

Возвращает значение, связанное с «самым низким» ключом.

set_agg ( expression )

Добавляет все входные значения в набор.

См. Различные функции для получения информации о функциях, которые можно использовать для работы с наборами.


Операция combine

Поскольку PipelineDB может инкрементально обновлять агрегированные значения, у него есть возможность объединять существующие агрегаты, используя больше информации, а не только их текущие исходные значения. Например, объединение нескольких средних значений не сводится просто к вычислению среднего значения из средних значений. Их веса должны быть учтены.

Для этого типа операций, в PipelineDB есть специальный агрегат combine. Описание агрегата:

combine ( aggregate column )

Берет агрегированный столбец и объединяет все значения в одно значение, как если бы все отдельные входные агрегаты были агрегированы один раз.

Примечание

combine работает только с агрегированными столбцами, которые принадлежат непрерывным представлениям.

Давайте рассмотрим на примере:

pipeline=# CREATE CONTINUOUS VIEW v AS SELECT g::integer, AVG(x::integer) FROM stream GROUP BY g; CREATE CONTINUOUS VIEW pipeline=# INSERT INTO stream (g, x) VALUES (0, 10), (0, 10), (0, 10), (0, 10), (0, 10); INSERT 0 5 pipeline=# INSERT INTO stream (g, x) VALUES (1, 20); INSERT 0 1 pipeline=# SELECT * FROM v; g | avg ---+--------------------- 0 | 10.0000000000000000 1 | 20.0000000000000000 (2 rows)

pipeline=# SELECT avg(avg) FROM v; avg
---------------------
 15.0000000000000000 (1 row)

pipeline=# -- Но не учитывался тот факт, что значение 10 весит намного больше, pipeline=# -- потому что оно было вставлено 5 раз, в то время как 20 было вставлено только один раз. pipeline=# -- combine() учитывает этот вес pipeline=# pipeline=# SELECT combine(avg) FROM v; combine
---------------------
 11.6666666666666667 (1 row)

pipeline=# -- Вот и все! Это та же средняя величина, которую мы бы получили, если бы запустили pipeline=# -- одно среднее значение всех 6 вышеуказанных вставленных значений, но нам понадобилось всего две строки, чтобы это сделать.

CREATE AGGREGATE

Помимо встроенных в PipelineDB агрегатов, пользовательские агрегаты также работают с непрерывными представлениями. Пользовательские комбинируемые агрегаты могут быть созданы с помощью команды PostgreSQL CREATE AGGREGATE. Чтобы сделать агрегат комбинируемым, необходимо указать combinefunc. combineinfunc и transoutfunc являются необязательными:

CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) (
        ...
        COMBINEFUNC = combinefunc,
        [ , COMBINEINFUNC = combineinfunc ]
        [ , TRANSOUTFUNC = transoutfunc ]
)

combinefunc ( stype, stype )

Функция, которая принимает два переходных состояния и возвращает одно переходное состояние. Вот пример функции combine для получения среднего значения для целых чисел avg:

CREATE FUNCTION avg_combine(state integer[], incoming integer[]) RETURNS integer[] AS $$
BEGIN
        RETURN ARRAY[state[1] + incoming[1], state[2] + incoming[2]];
END;
$$
LANGUAGE plpgsql

Переходное состояние представлено в виде двухмерного массива, содержащего количество элементов и их сумму, которые можно использовать для вычисления окончательного результата.

combineinfunc ( any )

Функция, которая преобразовывает переходное состояние агрегатов из внешнего во внутреннее представление. Преобразование необходимо только в том случае, если тип переходного состояния не является нативным.

transoutfunc ( stype )

Функция, которая преобразовывает переходное состояние агрегатов из внутреннего во внешнее представление, которое может быть сохранено в ячейке таблицы. Преобразование необходимо только в том случае, если тип переходного состояния не является нативным типом.


Общие агрегаты

array_agg ( expression )

Входные значения, включая нули, объединенные в массив

avg ( expression )

Среднее всех входных значений

bit_and ( expression )

Побитовое И (AND) всех ненулевых входных значений или null, если их нет

bit_or ( expression )

Побитовое ИЛИ (OR) всех ненулевых входных значений или null, если их нет

bool_and ( expression )

True, если все входные значения true, в противном случае - false.

bool_or ( expression )

True, если хотя бы одно входное значение true, в противном случае - false.

count ( * )

Количество строк ввода

count ( DISTINCT expression)

Количество строк, для которых выражение является уникальным.

Примечание

Подсчет точного числа выражений в бесконечном потоке потребовал бы бесконечной памяти, поэтому непрерывные представления используют HyperLogLog для выполнения подсчета точных значений с использованием постоянного объема данных и за постоянное время за счет допущения небольшой погрешности. Исходя из опыта, реализация HyperLogLog в PipelineDB имеет погрешность около 0,81%. Например, count distinct может выдать 1008, когда фактическое количество уникальных выражений было 1000.

count ( expression )

Количество строк, для которых выражение не null.

every ( expression )

Эквивалент bool_and

json_agg ( expression )

Агрегирует значения как массив JSON

json_object_agg ( key, value )

Агрегирует пары ключ-значение в виде JSON-объекта

jsonb_agg ( expression )

Агрегирует значения как массив JSONB

jsonb_object_agg ( key, value )

Агрегирует пары ключ-значение в виде объекта JSONB

max ( expression )

Максимальное значение выражения среди всех входных значений

min ( expression )

Минимальное значение выражения среди всех входных значений

string_agg ( expression, delimiter )

Входные значения объединены в строку, разделенную разделителем

sum ( expression )

Сумма выражения по всем входным значениям


Статистические агрегаты

corr ( y, x )

Коэффициент корреляции

covar_pop ( y, x )

Ковариация совокупности

covar_samp ( y, x )

Ковариация выборки

regr_avgx ( y, x )

Среднее значение независимой переменной (sum(x)/N)

regr_avgy ( y, x )

Среднее значение независимой переменной (sum(y)/N)

regr_count ( y, x )

Количество строк ввода, в которых оба выражения не null.

regr_intercept ( y, x )

Точка пересечения c осью Y лини, полученной методом наименьших квадратов, по данным (x, y)

regr_r2 ( y, x )

Квадрат коэффициента корреляции

regr_slope ( y, x )

Наклон линии, полученной методом наименьших квадратов по данным (x, y)

regr_sxx ( y, x )

sum(X^2) - sum(X)^2/N – сумма квадратов независимой переменной

regr_sxy ( y, x )

sum(X*Y) - sum(X) * sum(Y)/N – сумма произведений независимых и зависимых переменных

regr_syy ( y, x )

sum(Y^2) - sum(Y)^2/N – сумма квадратов независимой переменной

stddev ( expression )

Стандартное отклонение по выборке входных значений

stddev_pop ( expression )

Стандартное отклонение по совокупности входных значений

variance ( expression )

Дисперсия для выборки входных значений (квадрат стандартного отклонения по выборке)

var_pop ( expression )

Дисперсия для совокупности входных значений (квадрат стандартного отклонения по совокупности)


Агрегаты с упорядоченным набором

Агрегаты с упорядоченным набором применяют сортировку к вводу для получения соответствующих результатов, поэтому они используют предложение WITHIN GROUP. Синтаксис следующий:

aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause )

Давайте посмотрим на несколько примеров.

Вычислите 99-й процентиль значения:

SELECT percentile_cont(0.99) WITHIN GROUP (ORDER BY value) FROM some_table;

Или с непрерывным представлением:

CREATE CONTINUOUS VIEW percentile AS
SELECT percentile_cont(0.99) WITHIN GROUP (ORDER BY value::float8)
FROM some_stream;

percentile_cont ( fraction )

Непрерывный процентиль: возвращает значение, соответствующее указанной дроби по порядку, интерполируя соседние входные значения при необходимости

percentile_cont ( array of fractions )

Множественный непрерывный процентиль: возвращает массив результатов, соответствующих форме параметра дроби, при этом каждый элемент не-null заменяется значением, соответствующим этому процентилю

Примечание

Вычисление процентилей на бесконечных потоках потребовало бы бесконечной памяти, поэтому обе формы percentile_cont, используемые непрерывными представлениями, используют T-Digest как способ оценки процентилей с очень высокой степенью точности. В общем, процентили в непрерывных представлениях более точны, если они ближе к верхним или нижним границам [0, 1).


Агрегатные функции с гипотетическим набором

Агрегатные функции с гипотетическим набором берут выражение и вычисляют какое-то значение в контексте набора входных строк. Например, rank(2) вычисляет code:rank2 с учетом того, чем в итоге окажутся входные строки.

Агрегаты с гипотетическим набором используют WITHIN GROUP для определения входных строк. Синтаксис следующий:

aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause )

Вот пример использования агрегатной функции с гипотетическим набором в непрерывном представлении:

CREATE CONTINUOUS VIEW continuous_rank AS
SELECT rank(42) WITHIN GROUP (ORDER BY value::float8)
FROM some_stream;

Это непрерывное представление будет непрерывно обновлять ранг 42, учитывая все события, которые оно прочитало.

rank ( arguments )

Ранг гипотетической строки с пропусками вместо дублирующихся строк

dense_rank ( arguments )

Ранг гипотетической строки без пропусков

Примечание

Вычисление гипотетического плотного ранга значения при наличии бесконечного потока значений потребовало бы бесконечной памяти, поэтому непрерывные представления используют HyperLogLog для выполнения этой операции за постоянное время и используя постоянный объем данных, за счет небольшой погрешности. Исходя из опыта, реализация HyperLogLog в PipelineDB проходит с погрешностью около 0.2%. Другими словами, плотный ранг (1000) в непрерывном представлении может показать 998, когда фактическое количество уникальных значений с более низким рангом было 1000.

percent_rank ( arguments )

Относительный ранг гипотетической строки, варьирующийся от 0 до 1

cume_dist ( arguments )

Относительный ранг гипотетической строки, варьирующийся от 1/N до 1.


Неподдерживаемые агрегатные функции

mode ( )

Будущие версии PipelineDB будут поддерживать реализацию алгоритма оценки режима в режиме онлайн, но на данный момент данный функционал не поддерживается

percentile_disc ( arguments )

Для входного процентиля (например, 0,99), percentile_disc возвращает самое первое значение во входном наборе, которое находится в этом процентиле. Это требует фактической сортировки входного набора, что очевидно непрактично при работе с бесконечным потоком, и даже не позволяет использовать высокоточный алгоритм оценки, аналогичный используемому для percentile_cont.

xmlagg ( xml )

:(

aggregate_name (DISTINCT expression)

Только агрегатная функция count поддерживается с выражением DISTINCT, как указано выше в разделе «Общие агрегаты». В будущих версиях мы сможем использовать Фильтр Блума чтобы разрешить выражения DISTINCT для всех агрегатных функций.