47.6. Выходные плагины логического декодирования#
47.6. Выходные плагины логического декодирования
Пример плагина вывода можно найти в подкаталоге
contrib/test_decoding
дерева исходного кода PostgreSQL.
47.6.1. Функция инициализации
Выходной плагин загружается путем динамической загрузки общей библиотеки с именем выходного плагина в качестве базового имени библиотеки. Для поиска библиотеки используется нормальный путь поиска библиотек. Чтобы предоставить необходимые обратные вызовы выходного плагина и указать, что библиотека на самом деле является выходным плагином, она должна предоставить функцию с именем _PG_output_plugin_init
. Эта функция получает структуру, которую необходимо заполнить указателями на функции обратного вызова для отдельных действий.
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
Колбэки begin_cb
, change_cb
и commit_cb
обязательны, в то время как startup_cb
, filter_by_origin_cb
, truncate_cb
и shutdown_cb
являются необязательными. Если truncate_cb
не установлен, но требуется декодировать TRUNCATE
, действие будет проигнорировано.
An output plugin may also define functions to support streaming of large,
in-progress transactions. Функции stream_start_cb
,
stream_stop_cb
, stream_abort_cb
,
stream_commit_cb
, stream_change_cb
,
и stream_prepare_cb
обязательны, в то время как функции stream_message_cb
и
stream_truncate_cb
являются необязательными.
Выходной плагин также может определять функции для поддержки двухфазных коммитов,
что позволяет выполнять действия при декодировании команды PREPARE TRANSACTION
.
Требуются обратные вызовы begin_prepare_cb
, prepare_cb
,
stream_prepare_cb
,
commit_prepared_cb
и rollback_prepared_cb
,
а filter_prepare_cb
является необязательным.
47.6.2. Возможности
Для декодирования, форматирования и вывода изменений, плагины вывода могут использовать большую часть нормальной инфраструктуры сервера, включая вызов функций вывода. Разрешен только чтение отношений, при условии, что обращение происходит только к отношениям, созданным командой initdb
в схеме pg_catalog
, или отмеченным как пользовательские каталоговые таблицы.
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
Обратите внимание, что доступ к таблицам пользовательского каталога или обычным системным каталогам в плагинах вывода должен осуществляться только через API сканирования systable_*
. Доступ через API сканирования heap_*
вызовет ошибку. Кроме того, запрещены любые действия, приводящие к присвоению идентификатора транзакции. Сюда входят, среди прочего, запись в таблицы, выполнение изменений DDL и вызов pg_current_xact_id()
.
47.6.3. Режимы вывода
Выводные обратные вызовы плагина могут передавать данные потребителю в почти произвольных форматах. Для некоторых случаев использования, например, просмотра изменений через SQL, возвращение данных в типе данных, который может содержать произвольные данные (например, bytea
), неудобно. Если выводной плагин выводит только текстовые данные в кодировке сервера, он может объявить это, установив OutputPluginOptions.output_type
в OUTPUT_PLUGIN_TEXTUAL_OUTPUT
вместо OUTPUT_PLUGIN_BINARY_OUTPUT
в обратном вызове startup. В этом случае все данные должны быть в кодировке сервера, чтобы text
мог содержать их. Это проверяется в сборках с включенными утверждениями.
47.6.4. Вызовы обратного вызова выходного плагина
Выходной плагин получает уведомления о происходящих изменениях через различные обратные вызовы, которые он должен предоставить.
Все параллельные транзакции декодируются в порядке коммита, и только изменения, принадлежащие конкретной транзакции, декодируются между обратными вызовами begin
и commit
. Транзакции, явно или неявно откатившиеся, никогда не декодируются. Успешные точки сохранения объединяются в транзакцию, содержащую их, в порядке их выполнения внутри этой транзакции. Транзакция, подготовленная для двухфазного коммита с использованием PREPARE TRANSACTION
, также будет декодирована, если предоставлены обратные вызовы плагина вывода, необходимые для декодирования. Возможно, что текущая подготовленная транзакция, которая декодируется, будет аннулирована параллельно с помощью команды ROLLBACK PREPARED
. В этом случае логическое декодирование этой транзакции также будет прервано. Все изменения такой транзакции пропускаются после обнаружения отката и вызова обратного вызова prepare_cb
. Таким образом, даже в случае параллельного отката предоставляется достаточно информации плагину вывода, чтобы он должным образом обработал команду ROLLBACK PREPARED
, как только она будет декодирована.
Примечание
Только транзакции, которые уже безопасно записаны на диск, будут декодированы. Это может привести к тому, что COMMIT
не будет сразу же декодирован в непосредственно следующем вызове pg_logical_slot_get_changes()
, когда значение synchronous_commit
установлено в off
.
47.6.4.1. Запуск обратного вызова
Опциональный обратный вызов startup_cb
вызывается каждый раз, когда создается слот репликации или запрашивается поток изменений, независимо от количества готовых к отправке изменений.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
Параметр is_init
будет равен true, когда слот репликации создается, и false в противном случае. options
указывает на структуру опций, которые могут быть установлены плагинами вывода:
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
должен быть установлен либо в
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
,
либо в OUTPUT_PLUGIN_BINARY_OUTPUT
. См. также
Раздел 47.6.3.
Если receive_rewrites
установлено в true, плагин вывода
также будет вызываться для изменений, внесенных перезаписью кучи во время
некоторых операций DDL. Это интересно для плагинов, обрабатывающих репликацию DDL,
но требуется особая обработка.
Коллбэк-функция запуска должна проверить наличие опций в
ctx->output_plugin_options
. Если выходной плагин
требует наличия состояния, он может
использовать ctx->output_plugin_private
для его хранения.
47.6.4.2. Shutdown Callback
Опциональный обратный вызов shutdown_cb
вызывается, когда ранее активный слот репликации больше не используется и может использоваться для освобождения ресурсов, приватных для выходного плагина. Слот не обязательно удаляется, потоковая передача просто останавливается.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
47.6.4.3. Начало транзакции обратного вызова
Требуемый обратный вызов begin_cb
вызывается каждый раз, когда началась декодировка подтвержденной транзакции. Отмененные транзакции и их содержимое никогда не декодируются.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
Параметр txn
содержит метаинформацию о транзакции, такую как временная метка, когда она была подтверждена, и ее XID.
47.6.4.4. Обратный вызов окончания транзакции
Требуемый обратный вызов commit_cb
вызывается каждый раз, когда
транзакция коммитится.
Обратные вызовы change_cb
для всех измененных
строк будут вызваны до этого, если были измененные
строки.
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47.6.4.5. Изменить обратный вызов
Требуется обратный вызов change_cb
, который вызывается для каждой
отдельной модификации строки внутри транзакции, будь то
INSERT
, UPDATE
или
DELETE
. Даже если исходная команда модифицировала
несколько строк одновременно, обратный вызов будет вызываться отдельно для каждой
строки. Обратный вызов change_cb
может получать доступ к системным или
пользовательским каталогам для помощи в процессе вывода деталей модификации строки.
В случае декодирования подготовленной (но еще не подтвержденной) транзакции или декодирования неподтвержденной транзакции, этот
обратный вызов изменения также может завершиться с ошибкой из-за одновременного отката
этой же самой транзакции. В этом случае логическое декодирование этой
прерванной транзакции прекращается плавно.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
Параметры ctx
и txn
имеют тот же содержимое, что и для обратных вызовов begin_cb
и commit_cb
, но дополнительно передаются дескриптор отношения relation
, указывающий на отношение, к которому принадлежит строка, и структура change
, описывающая модификацию строки.
Примечание
С помощью логического декодирования можно извлекать только изменения в пользовательских таблицах, которые не являются нефиксируемыми (см. UNLOGGED
) и не являются временными (см. TEMPORARY
or TEMP
).
47.6.4.6. Обратный вызов Truncate
Коллбэк truncate_cb
вызывается для команды TRUNCATE
.
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Параметры аналогичны обратному вызову change_cb
.
Однако, поскольку действия TRUNCATE
на таблицах, связанных внешними ключами, должны выполняться вместе, этот обратный вызов получает массив отношений вместо одного. См. описание оператора TRUNCATE для получения подробной информации.
47.6.4.7. Фильтр обратного вызова источника
Опциональный обратный вызов filter_by_origin_cb
вызывается для определения, являются ли данные, воспроизведенные из origin_id
, интересными для выходного плагина.
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
Параметр ctx
имеет тот же контент, что и для других обратных вызовов. Нет доступной информации, кроме источника. Чтобы указать, что изменения, исходящие от переданного узла, не имеют значения, верните true, что приведет к их фильтрации; в противном случае - false. Для транзакций и изменений, которые были отфильтрованы, другие обратные вызовы не будут вызываться.
Это полезно при реализации каскадных или многонаправленных решений репликации. Фильтрация по источнику позволяет предотвратить повторная репликация одних и тех же изменений в таких настройках. Хотя транзакции и изменения также содержат информацию об источнике, фильтрация через этот обратный вызов заметно более эффективна.
47.6.4.8. Общий обратный вызов сообщений
Опциональный обратный вызов message_cb
вызывается каждый раз, когда декодировано сообщение логического декодирования.
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
Параметр txn
содержит метаинформацию о транзакции, такую как временная метка, когда она была подтверждена, и ее XID. Однако обратите внимание, что он может быть NULL, когда сообщение не является транзакционным и XID еще не был назначен в транзакции, которая записала сообщение. Параметр lsn
содержит местоположение WAL сообщения. Параметр transactional
указывает, было ли сообщение отправлено как транзакционное или нет. Аналогично обратному вызову изменений, в случае декодирования подготовленной (но еще не подтвержденной) транзакции или декодирования неподтвержденной транзакции, этот обратный вызов сообщений также может завершиться с ошибкой из-за одновременного отката этой же самой транзакции. В этом случае логическое декодирование этой прерванной транзакции прекращается плавно.
Параметр prefix
- это произвольный префикс с нулевым завершением, который может использоваться для идентификации интересующих сообщений для текущего плагина. И, наконец, параметр message
содержит фактическое сообщение размером message_size
.
Следует обратить особое внимание на то, чтобы префикс, который рассматривает выходной плагин, был уникальным. Часто хорошим выбором является использование имени расширения или самого выходного плагина.
47.6.4.9. Подготовка обратного вызова фильтра
Необязательный обратный вызов filter_prepare_cb
вызывается для определения того, должны ли данные, являющиеся частью текущей двухфазной транзакции подготовки, рассматриваться для декодирования на этапе подготовки или позже, как обычная однофазная транзакция при выполнении команды COMMIT PREPARED
. Чтобы указать, что декодирование должно быть прне указано, верните true
; в противном случае верните false
. Когда обратный вызов не определен, предполагается false
(т.е. нет фильтрации, все транзакции, использующие двухфазный коммит, также декодируются в две фазы).
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
Параметр ctx
содержит те же данные, что и для
других обратных вызовов. Параметры xid
и gid
предоставляют два разных способа идентификации
транзакции. Позднее выполнение команды COMMIT PREPARED
или
ROLLBACK PREPARED
содержит оба идентификатора,
что позволяет плагину вывода выбрать, что использовать.
Коллбэк может быть вызван несколько раз за транзакцию для декодирования и должен предоставлять одинаковый статический ответ для заданной пары xid
и gid
каждый раз, когда он вызывается.
47.6.4.10. Начало транзакции Подготовка Обратного вызова
Требуемый обратный вызов begin_prepare_cb
вызывается, когда начало подготовленной транзакции было декодировано. Поле gid
, которое является частью параметра txn
, может быть использовано в этом обратном вызове для проверки, была ли уже получена этой плагином команда PREPARE
, в таком случае он может либо вызвать ошибку, либо пропустить оставшиеся изменения транзакции.
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
47.6.4.11. Обратный вызов подготовки транзакции
Требуемый обратный вызов prepare_cb
вызывается каждый раз, когда
транзакция, подготовленная для двухфазного коммита, была
декодирована. Обратный вызов change_cb
для всех измененных
строк будет вызван до этого, если были изменены какие-либо строки.
Поле gid
, которое является частью
параметра txn
, может быть использовано в этом обратном вызове.
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
47.6.4.12. Обратный вызов коммита транзакции
Требуемый обратный вызов commit_prepared_cb
вызывается
каждый раз, когда транзакция COMMIT PREPARED
была декодирована.
Поле gid
, которое является частью
параметра txn
, может быть использовано в этом обратном вызове.
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47.6.4.13. Обратный вызов отката транзакции, подготовленный
Необходимый обратный вызов rollback_prepared_cb
вызывается каждый раз, когда декодирована команда ROLLBACK PREPARED
. Поле gid
, которое является частью параметра txn
, может использоваться в этом обратном вызове. Параметры prepare_end_lsn
и prepare_time
могут использоваться для проверки, была ли получена плагином команда PREPARE TRANSACTION
, в таком случае можно выполнить откат, в противном случае можно пропустить операцию отката. Только поле gid
недостаточно, потому что у удаленного узла может быть подготовленная транзакция с тем же идентификатором.
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
47.6.4.14. Обратный вызов начала потока
Колбэк stream_start_cb
вызывается при открытии блока потоковых изменений из транзакции, находящейся в процессе выполнения.
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
47.6.4.15. Обратный вызов остановки потока
Колбэк stream_stop_cb
вызывается при закрытии блока потоковых изменений из транзакции, находящейся в процессе выполнения.
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
47.6.4.16. Обратный вызов прерывания потока
Колбэк stream_abort_cb
вызывается для прерывания
ранее переданной транзакции.
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
47.6.4.17. Обратный вызов подготовки потока
Коллбэк stream_prepare_cb
вызывается для подготовки
ранее переданной транзакции в рамках двухфазного коммита.
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
47.6.4.18. Обратный вызов потока коммита
Коллбэк stream_commit_cb
вызывается для коммита
ранее переданной транзакции.
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47.6.4.19. Обратный вызов изменения потока
Коллбэк stream_change_cb
вызывается при отправке изменения в блоке потоковых изменений (ограниченных вызовами stream_start_cb
и stream_stop_cb
).
Фактические изменения не отображаются, так как транзакция может быть отменена позже, и мы не декодируем изменения для отмененных транзакций.
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
47.6.4.20. Обратный вызов потока сообщений
Коллбэк stream_message_cb
вызывается при отправке
общего сообщения в блоке потоковых изменений (ограниченного вызовами
stream_start_cb
и stream_stop_cb
).
Содержимое сообщений транзакций не отображается, так как транзакция
может быть отменена впоследствии, и мы не декодируем изменения для отмененных
транзакций.
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
47.6.4.21. Обратный вызов обрезки потока
Коллбэк stream_truncate_cb
вызывается для команды TRUNCATE
в блоке потоковых изменений (ограниченном вызовами stream_start_cb
и stream_stop_cb
).
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Параметры аналогичны обратному вызову stream_change_cb
.
Однако, поскольку действия TRUNCATE
на таблицах, связанных внешними ключами, должны выполняться вместе, этот обратный вызов получает массив отношений вместо одного. Подробности см. в описании оператора TRUNCATE.
47.6.5. Функции для создания вывода
Для фактического вывода данных, плагины вывода могут записывать данные в буфер вывода StringInfo
в ctx->out
при нахождении в обратных вызовах begin_cb
, commit_cb
или change_cb
. Перед записью в буфер вывода необходимо вызвать OutputPluginPrepareWrite(ctx, last_write)
, а после завершения записи в буфер необходимо вызвать OutputPluginWrite(ctx, last_write)
для выполнения записи. Параметр last_write
указывает, была ли данная запись последней в обратном вызове.
Следующий пример показывает, как вывести данные потребителю плагина вывода:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);