Интеграции
Apache Kafka
Tantor PipelineDB поддерживает ввод данных из топиков Kafka в потоки. Функции содержатся в расширении pipeline_kafka. Внутри pipeline_kafka использует средства PostgreSQLs COPY для преобразования сообщений Kafka в строки, которые понимает Tantor PipelineDB.
Примечание
Расширение pipeline_kafka официально поддерживается, но не поставляется вместе с пакетами Tantor PipelineDB и, следовательно, должно быть установлено отдельно. Репозиторий для расширения находится по ссылке. Инструкции по сборке и установке расширения можно найти в файле README.md.
pipeline_kafka использует общую память для синхронизации состояния между фоновыми процессами, поэтому его необходимо загружать как общую библиотеку. Это можно сделать, добавив следующую строку в файл pipelinedb.conf
. Если вы уже загружаете некоторые общие библиотеки, просто добавьте pipeline_kafka
в список, через запятую.
shared_preload_libraries = pipeline_kafka
Теперь вы можете загрузить расширение в базу данных:
postgres=# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION
Прежде чем начать использовать pipeline_kafka, необходимо добавить брокер для развертывания Kafka.
pipeline_kafka.add_broker ( hostname text )
hostname
— это строка в форматеhost[:port]
. Можно добавить несколько брокеров, вызвав pipeline_kafka.add_broker для каждого хоста.
Потребление сообщений
pipeline_kafka.consume_begin ( topic text, stream text, format := „text“, delimiter := E“\t“, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )
Запускает параллельно несколько фоновых рабочих процессов, каждый из которых считывает сообщения из указанного топика Kafka в указанный поток. Целевой поток должен быть создан заранее с помощью
CREATE FOREIGN TABLE
. Все партиции указанного топика будут равномерно распределены по каждому рабочему процессу.Необязательные аргументы
format
,delimiter
,escape
иquote
аналогичны опциямFORMAT
,DELIMITER
ESCAPE
иQUOTE
для команды PostgreSQLs COPY, за исключением того, что pipeline_kafka поддерживает дополнительный формат json. Формат json интерпретирует каждое сообщение Kafka как объект JSON.
batchsize
контролирует параметрbatch_size
, передаваемый клиенту Kafka. ЦиклCOPY
и коммит после буферизации сообщенийbatchsize
выполняется принудительно.
maxbytes
контролирует параметрfetch.message.max.bytes
, передаваемый клиенту Kafka. ЦиклCOPY
и коммит после буферизации данныхmaxbytes
выполняется принудительно.
start_offset
указывает смещение, с которого начинается чтение партиций топика Kafka.
pipeline_kafka непрерывно сохраняет в базе данных текущее смещение, до которого были прочитаны данные. Если start_offset
равен NULL
, сохранение начинается с сохраненного смещения или с конца партиции, если сохраненного смещения нет. При значении start_offset
равному -1
чтение начинается с конца каждой партиции, а -2
— с начала каждой партиции. Использование любого другого значения start_offset
не рекомендуется, поскольку смещения в разных партициях не взаимосвязаны.
pipeline_kafka.consume_begin ( )
То же, что и выше, но запускает все ранее созданные потребители вместо конкретной пары поток-топик.
pipeline_kafka.consume_end ( topic text, stream text )
Завершает фоновые процессы потребителя для указанной пары поток-топик.
pipeline_kafka.consume_end ( )
То же, что и выше, но завершает все процессы потребителя.
Генерация сообщений
pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )
Производит одно сообщение
message
и отправляет в целевой топикtopik
. Оба параметраpartition
иkey
являются необязательными. По умолчанию партиция остается неназначенной, поэтому брокер сам решает, в какую партицию отправить сообщение в зависимости от функции партицирования топика. Если необходимо отправить сообщение в конкретную партицию, укажите его какinteger
.key
— это аргументbytea
, который будет использоваться в качестве ключа для функции разделения.
pipeline_kafka.emit_tuple ( topic, partition, key )
Это триггерная функция, которую можно использовать для отправки кортежей в поток Kafka в формате JSON. Ее можно использовать только для триггера
AFTER INSERT OR UPDATE
иFOR EACH ROW
. В случаеUPDATE
, отправляются новые обновленные кортежи. Параметрtopik
должен быть указан, в то время какpartition
иkey
являются необязательными. Поскольку это триггерная функция, все аргументы должны быть переданы как строковые литералы, и нет способа указать именованные аргументы. Если вы хотите указать толькоtopik
иkey
, используйте-1
в качестве раздела, при этом партиция останется без назначения.key
— это имя столбца в передаваемом кортеже, значение которого должно использоваться в качестве ключа партиции.
Метаданные
pipeline_kafka использует несколько таблиц чтобы точно отслеживать собственное состояние при перезапуске системы:
pipeline_kafka.consumers
Хранит метаданные для каждого потребителя потока-топика, созданного pipeline_kafka.consume_begin.
pipeline_kafka.brokers
Хранит все брокеры Kafka, к которым могут подключаться потребители.
pipeline_kafka.offsets
Хранит смещение топиков Kafka, чтобы потребители могли начать чтение сообщений с того места, где они остановились перед завершением или перезапуском системы.
Amazon Kinesis
Tantor PipelineDB также поддерживает ввод данных из потоков Amazon Kinesis. Данная функциональность предоставляется расширением pipeline_kinesis. Внутри расширение управляет фоновыми рабочими процессами, которые потребляют данные, используя AWS SDK, и копируют их в потоки pipeline.
Репозиторий для расширения находится по ссылке. Инструкции по сборке и установке расширения можно найти в файле README.md.
Для активации расширения его необходимо явно загрузить:
postgres=# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION
Для запуска поглощения данных необходимо сначала сообщить конвейеру данных, где и как получить данные Kinesis, настроив конечную точку:
pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )
name
— уникальный идентификатор для конечной точки.
region
— строка, идентифицирующая регион AWS, например,us-east-1
илиus-west-2
.
credfile
— необязательный параметр, который позволяет переопределить расположение файла по умолчанию для учетных данных AWS.
url
— необязательный параметр, который позволяет использовать другой сервер Kinesis (не AWS). В основном применяется для тестирования с локальными серверами kinesis, типа kinesalite.
Потребление сообщений
pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := „text“, delimiter text := E“\t“, quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )
Запускает группу логических потребителей, которая потребляет сообщения kinesis из потока
stream
kinesis на конечной точкеendpoint
и копирует их в отношениеrelation
потока конвейера данных.Параллельная обработка данных
parallelism
используется для указания количества фоновых рабочих процессов, которые должны использоваться на каждого потребителя для балансировки нагрузки. Примечание: значение не нужно устанавливать на количество шардов, поскольку расширение по внутреннему устройству потоковое. Значения по умолчанию1
достаточно, пока потребитель не начнет отставать.
format
,delimiter
,escape
иquote
— необязательные параметры, используемые для управления форматом скопированных строк, как в PostgreSQLs COPY.
batchsize
передается в AWS SDK и управляет параметромLimit
, используемым в Kinesis GetRecords.
start_offset
используется для управления позицией потока, с которой расширение начинает читать данные. При значенииstart_offset
равному-1
чтение начинается с конца потока, а-2
— с начала. Внутренне они сопоставляются сTRIM_HORIZON
иLATEST
. Смотрите Kinesis GetShardIterator для получения дополнительной информации.
pipeline_kinesis.consume_end (endpoint text, stream text, relation text)
Завершает все фоновые рабочие процессы для конкретного потребителя.
pipeline_kinesis.consume_begin()
Запускает все ранее созданные потребители.
pipeline_kinesis.consume_end()
Завершает все фоновые рабочие процессы для всех ранее запущенных потребителей.
Метаданные
pipeline_kinesis использует несколько таблиц, чтобы точно отслеживать собственное состояние при перезапуске системы:
pipeline_kinesis.endpoints
Хранит метаданные для каждой конечной точки, созданной kinesis_add_endpoint.
pipeline_kinsesis.consumers
Хранит метаданные для каждого потребителя, созданного kinesis_consume_begin.
pipeline_kinsesis.seqnums
Хранит метаданные для каждого потребителя на уровне шарда. А именно, seqnums.