Интеграции
Apache Kafka
PipelineDB поддерживает ввод данных из топиков Kafka в потоки. Функции содержатся в расширении pipeline_kafka. Внутри pipeline_kafka использует средства PostgreSQLs COPY для преобразования сообщений Kafka в строки, которые понимает PipelineDB.
Примечание
Расширение pipeline_kafka официально поддерживается, но не поставляется вместе с пакетами PipelineDB и, следовательно, должно быть установлено отдельно. Репозиторий для расширения находится здесь. Инструкции по сборке и установке расширения можно найти в файле README.md
.
pipeline_kafka использует общую память для синхронизации состояния между фоновыми процессами, поэтому его необходимо загружать как общую библиотеку. Это можно сделать, добавив следующую строку в файл pipelinedb.conf
. Если вы уже загружаете некоторые общие библиотеки, просто добавьте pipeline_kafka
в список, через запятую.
shared_preload_libraries = pipeline_kafka
Теперь вы можете загрузить расширение в базу данных:
# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION
Прежде чем начать использовать pipeline_kafka, необходимо добавить брокер для развертывания Kafka.
pipeline_kafka.add_broker ( hostname text )
hostname - это строка в формате <host>[:<port>]
Можно добавить несколько брокеров, вызвав pipeline_kafka.add_broker для каждого хоста.
Потребление сообщений
pipeline_kafka.consume_begin ( topic text, stream text, format := „text“, delimiter := E“\t“, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )
Запускает параллельно несколько фоновых рабочих процессов, каждый из которых считывает сообщения из указанного топика Kafka в указанный поток. Целевой поток должен быть создан заранее с помощью CREATE STREAM
. Все партиции указанного топика будут равномерно распределены по каждому рабочему процессу.
Необязательные аргументы format, delimiter, escape и quote аналогичны опциям FORMAT
, DELIMITER
ESCAPE
и QUOTE
для команды PostgreSQLs COPY, за исключением того, что pipeline_kafka поддерживает дополнительный формат: json. Формат json интерпретирует каждое сообщение Kafka как объект JSON.
batchsize контролирует параметр batch_size
, передаваемый клиенту Kafka. Мы принудительно выполняем цикл COPY
и комита после буферизации сообщений batchsize
.
maxbytes контролирует параметр fetch.message.max.bytes
, передаваемый клиенту Kafka. Мы принудительно выполняем цикл COPY
и комита после буферизации данных maxbytes
.
start_offset указывает смещение, с которого начинается чтение партиций топика Kafka.
pipeline_kafka непрерывно сохраняет в базе данных текущее смещение, до которого были прочитаны данные. Если start_offset равен NULL
, мы начинаем с сохраненного смещения или с конца партиции, если нет сохраненного смещения. Значение start_offset -1 чтение начинается с конца каждой партиции, а -2 потребление начинается с начала каждой партиции. Использование любого другого значения start_offset было бы странным, поскольку смещения в разных партициях не взаимосвязаны.
pipeline_kafka.consume_begin()
То же, что и выше, но запускает все ранее созданные потребители вместо конкретной пары поток-топик.
pipeline_kafka.consume_end (topic text, stream text)
Завершает фоновые процессы потребителя для указанной пары поток-топик.
pipeline_kafka.consume_end()
То же, что и выше, но завершает все процессы потребителя.
Генерация сообщений
Добавлено в версии 0.9.1.
pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )
Производит одно сообщение и отправляет в целевой топик. Оба параметра partition и key являются необязательными. По умолчанию партиция остается неназначенной, поэтому брокер сам решает, в какую партицию отправить сообщение в зависимости от функции партицирования топика. Если необходимо отправить сообщение в конкретную партицию, укажите его как integer
. Ключ - это аргумент bytea
, который будет использоваться в качестве ключа для функции разделения.
pipeline_kafka.emit_tuple ( topic, partition, key )
Это триггерная функция, которую можно использовать для отправки кортежей в поток Kafka в формате JSON. Ее можно использовать только для триггера AFTER INSERT OR UPDATE
и FOR EACH ROW
. В случае UPDATE
, отправляются новые обновленные кортежи. Топик должен быть указан, в то время как partition и key являются необязательными. Поскольку это триггерная функция, все аргументы должны быть переданы как строковые литералы, и нет способа указать именованные аргументы. Если вы хотите указать только topic и key, используйте -1
в качестве раздела, при этом партиция останется без назначения. Ключ - это имя столбца в передаваемом кортеже, значение которого должно использоваться в качестве ключа партиции.
Metadata
pipeline_kafka использует несколько таблиц чтобы точно отслеживать собственное состояние при перезапуске системы:
pipeline_kafka.consumers
Хранит метаданные для каждого потребителя потока-топика, созданного pipeline_kafka.consume_begin.
pipeline_kafka.brokers
Хранит все брокеры Kafka, к которым могут подключаться потребители.
pipeline_kafka.offsets
Хранит смещение топиков Kafka, чтобы потребители могли начать чтение сообщений с того места, где они остановились перед завершением или перезапуском системы.
Примечание
См. SQL on Kafka для получения подробной информации по использованию Kafka с PipelineDB.
Amazon Kinesis
PipelineDB также поддерживает ввод данных из потоков Amazon Kinesis. Данная функциональность предоставляется расширением pipeline_kinesis. Внутри расширение управляет фоновыми рабочими процессами, которые потребляют данные, используя `AWS SDK`_, и копируют их в потоки pipeline.
Репозиторий для расширения находится здесь. Инструкции по сборке и установке расширения можно найти в файле README.md
.
Для активации расширения его необходимо явно загрузить:
# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION
Для запуска поглощения данных необходимо сначала сообщить конвейеру данных, где и как получить данные Kinesis, настроив конечную точку:
pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )
name - это уникальный идентификатор для конечной точки. region - это строка, идентифицирующая регион AWS, например, us-east-1
или us-west-2
.
credfile - это необязательный параметр, который позволяет переопределить расположение файла по умолчанию для учетных данных AWS.
url - это необязательный параметр, который позволяет использовать другой сервер Kinesis (не AWS). В основном применяется для тестирования с локальными серверами kinesis, типа kinesalite.
Потребление сообщений
pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := „text“, delimiter text := E“\t“, quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )
Запускает группу логических потребителей, которая потребляет сообщения kinesis из потока kinesis на конечной точке и копирует их в отношение потока конвейера данных.
Параллельная обработка данных используется для указания количества фоновых рабочих процессов, которые должны использоваться на каждого потребителя для балансировки нагрузки. Примечание - значение не нужно устанавливать на количество шардов, поскольку расширение по внутреннему устройству потоковое. Значения по умолчанию 1 достаточно, пока потребитель не начнет отставать.
format, delimiter, escape и quote - необязательные параметры, используемые для управления форматом скопированных строк, как в PostgreSQLs COPY.
batchsize передается в AWS SDK и управляет параметром Limit
, используемым в Kinesis GetRecords.
start_offset используется для управления позицией потока, с которой расширение начинает читать данные. -1 - чтение начинается с конца потока, а -2 - с начала. Внутренне они сопоставляются с TRIM_HORIZON
и LATEST
. См. Kinesis GetShardIterator для получения дополнительной информации.
pipeline_kinesis.consume_end (endpoint text, stream text, relation text)
Завершает все фоновые рабочие процессы для конкретного потребителя.
pipeline_kinesis.consume_begin()
Запускает все ранее созданные потребители.
pipeline_kinesis.consume_end()
Завершает все фоновые рабочие процессы для всех ранее запущенных потребителей.
Metadata
pipeline_kinesis использует несколько таблиц, чтобы точно отслеживать собственное состояние при перезапуске системы:
pipeline_kinesis.endpoints
Хранит метаданные для каждой конечной точки, созданной kinesis_add_endpoint
pipeline_kinsesis.consumers
Хранит метаданные для каждого потребителя, созданного kinesis_consume_begin.
pipeline_kinsesis.seqnums
Хранит метаданные для каждого потребителя на уровне шарда. А именно, seqnums.