Интеграции

Apache Kafka

Tantor PipelineDB поддерживает ввод данных из топиков Kafka в потоки. Функции содержатся в расширении pipeline_kafka. Внутри pipeline_kafka использует средства PostgreSQLs COPY для преобразования сообщений Kafka в строки, которые понимает Tantor PipelineDB.

Примечание

Расширение pipeline_kafka официально поддерживается, но не поставляется вместе с пакетами Tantor PipelineDB и, следовательно, должно быть установлено отдельно. Репозиторий для расширения находится по ссылке. Инструкции по сборке и установке расширения можно найти в файле README.md.

pipeline_kafka использует общую память для синхронизации состояния между фоновыми процессами, поэтому его необходимо загружать как общую библиотеку. Это можно сделать, добавив следующую строку в файл pipelinedb.conf. Если вы уже загружаете некоторые общие библиотеки, просто добавьте pipeline_kafka в список, через запятую.

shared_preload_libraries = pipeline_kafka

Теперь вы можете загрузить расширение в базу данных:

postgres=# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION

Прежде чем начать использовать pipeline_kafka, необходимо добавить брокер для развертывания Kafka.

pipeline_kafka.add_broker ( hostname text )

hostname — это строка в формате host[:port]. Можно добавить несколько брокеров, вызвав pipeline_kafka.add_broker для каждого хоста.

Потребление сообщений

pipeline_kafka.consume_begin ( topic text, stream text, format := „text“, delimiter := E“\t“, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )

Запускает параллельно несколько фоновых рабочих процессов, каждый из которых считывает сообщения из указанного топика Kafka в указанный поток. Целевой поток должен быть создан заранее с помощью CREATE FOREIGN TABLE. Все партиции указанного топика будут равномерно распределены по каждому рабочему процессу.

Необязательные аргументы format, delimiter, escape и quote аналогичны опциям FORMAT, DELIMITER ESCAPE и QUOTE для команды PostgreSQLs COPY, за исключением того, что pipeline_kafka поддерживает дополнительный формат json. Формат json интерпретирует каждое сообщение Kafka как объект JSON.

batchsize контролирует параметр batch_size, передаваемый клиенту Kafka. Цикл COPY и коммит после буферизации сообщений batchsize выполняется принудительно.

maxbytes контролирует параметр fetch.message.max.bytes, передаваемый клиенту Kafka. Цикл COPY и коммит после буферизации данных maxbytes выполняется принудительно.

start_offset указывает смещение, с которого начинается чтение партиций топика Kafka.

pipeline_kafka непрерывно сохраняет в базе данных текущее смещение, до которого были прочитаны данные. Если start_offset равен NULL, сохранение начинается с сохраненного смещения или с конца партиции, если сохраненного смещения нет. При значении start_offset равному -1 чтение начинается с конца каждой партиции, а -2 — с начала каждой партиции. Использование любого другого значения start_offset не рекомендуется, поскольку смещения в разных партициях не взаимосвязаны.

pipeline_kafka.consume_begin ( )

То же, что и выше, но запускает все ранее созданные потребители вместо конкретной пары поток-топик.

pipeline_kafka.consume_end ( topic text, stream text )

Завершает фоновые процессы потребителя для указанной пары поток-топик.

pipeline_kafka.consume_end ( )

То же, что и выше, но завершает все процессы потребителя.

Генерация сообщений

pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )

Производит одно сообщение message и отправляет в целевой топик topik. Оба параметра partition и key являются необязательными. По умолчанию партиция остается неназначенной, поэтому брокер сам решает, в какую партицию отправить сообщение в зависимости от функции партицирования топика. Если необходимо отправить сообщение в конкретную партицию, укажите его как integer. key — это аргумент bytea, который будет использоваться в качестве ключа для функции разделения.

pipeline_kafka.emit_tuple ( topic, partition, key )

Это триггерная функция, которую можно использовать для отправки кортежей в поток Kafka в формате JSON. Ее можно использовать только для триггера AFTER INSERT OR UPDATE и FOR EACH ROW. В случае UPDATE, отправляются новые обновленные кортежи. Параметр topik должен быть указан, в то время как partition и key являются необязательными. Поскольку это триггерная функция, все аргументы должны быть переданы как строковые литералы, и нет способа указать именованные аргументы. Если вы хотите указать только topik и key, используйте -1 в качестве раздела, при этом партиция останется без назначения. key — это имя столбца в передаваемом кортеже, значение которого должно использоваться в качестве ключа партиции.

Метаданные

pipeline_kafka использует несколько таблиц чтобы точно отслеживать собственное состояние при перезапуске системы:

pipeline_kafka.consumers

Хранит метаданные для каждого потребителя потока-топика, созданного pipeline_kafka.consume_begin.

pipeline_kafka.brokers

Хранит все брокеры Kafka, к которым могут подключаться потребители.

pipeline_kafka.offsets

Хранит смещение топиков Kafka, чтобы потребители могли начать чтение сообщений с того места, где они остановились перед завершением или перезапуском системы.

Amazon Kinesis

Tantor PipelineDB также поддерживает ввод данных из потоков Amazon Kinesis. Данная функциональность предоставляется расширением pipeline_kinesis. Внутри расширение управляет фоновыми рабочими процессами, которые потребляют данные, используя AWS SDK, и копируют их в потоки pipeline.

Репозиторий для расширения находится по ссылке. Инструкции по сборке и установке расширения можно найти в файле README.md.

Для активации расширения его необходимо явно загрузить:

postgres=# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION

Для запуска поглощения данных необходимо сначала сообщить конвейеру данных, где и как получить данные Kinesis, настроив конечную точку:

pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )

name — уникальный идентификатор для конечной точки.

region — строка, идентифицирующая регион AWS, например, us-east-1 или us-west-2.

credfile — необязательный параметр, который позволяет переопределить расположение файла по умолчанию для учетных данных AWS.

url — необязательный параметр, который позволяет использовать другой сервер Kinesis (не AWS). В основном применяется для тестирования с локальными серверами kinesis, типа kinesalite.

Потребление сообщений

pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := „text“, delimiter text := E“\t“, quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )

Запускает группу логических потребителей, которая потребляет сообщения kinesis из потока stream kinesis на конечной точке endpoint и копирует их в отношение relation потока конвейера данных.

Параллельная обработка данных parallelism используется для указания количества фоновых рабочих процессов, которые должны использоваться на каждого потребителя для балансировки нагрузки. Примечание: значение не нужно устанавливать на количество шардов, поскольку расширение по внутреннему устройству потоковое. Значения по умолчанию 1 достаточно, пока потребитель не начнет отставать.

format, delimiter, escape и quote — необязательные параметры, используемые для управления форматом скопированных строк, как в PostgreSQLs COPY.

batchsize передается в AWS SDK и управляет параметром Limit, используемым в Kinesis GetRecords.

start_offset используется для управления позицией потока, с которой расширение начинает читать данные. При значении start_offset равному -1 чтение начинается с конца потока, а -2 — с начала. Внутренне они сопоставляются с TRIM_HORIZON и LATEST. Смотрите Kinesis GetShardIterator для получения дополнительной информации.

pipeline_kinesis.consume_end (endpoint text, stream text, relation text)

Завершает все фоновые рабочие процессы для конкретного потребителя.

pipeline_kinesis.consume_begin()

Запускает все ранее созданные потребители.

pipeline_kinesis.consume_end()

Завершает все фоновые рабочие процессы для всех ранее запущенных потребителей.

Метаданные

pipeline_kinesis использует несколько таблиц, чтобы точно отслеживать собственное состояние при перезапуске системы:

pipeline_kinesis.endpoints

Хранит метаданные для каждой конечной точки, созданной kinesis_add_endpoint.

pipeline_kinsesis.consumers

Хранит метаданные для каждого потребителя, созданного kinesis_consume_begin.

pipeline_kinsesis.seqnums

Хранит метаданные для каждого потребителя на уровне шарда. А именно, seqnums.