46.6. Выходные плагины логического декодирования#

46.6. Выходные плагины логического декодирования

46.6. Выходные плагины логического декодирования #

Пример плагина вывода можно найти в подкаталоге contrib/test_decoding дерева исходного кода PostgreSQL.

46.6.1. Функция инициализации #

Выходной плагин загружается путем динамической загрузки общей библиотеки с именем выходного плагина в качестве базового имени библиотеки. Для поиска библиотеки используется нормальный путь поиска библиотек. Чтобы предоставить необходимые обратные вызовы выходного плагина и указать, что библиотека на самом деле является выходным плагином, она должна предоставить функцию с именем _PG_output_plugin_init. Эта функция получает структуру, которую необходимо заполнить указателями на функции обратного вызова для отдельных действий.

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

begin_cb, change_cb и commit_cb обратные вызовы обязательны, в то время как startup_cb, truncate_cb, message_cb, filter_by_origin_cb и shutdown_cb являются необязательными. Если truncate_cb не установлен, но TRUNCATE должен быть декодирован, действие будет проигнорировано.

Плагин вывода также может определять функции для поддержки потоковой передачи больших, незавершенных транзакций. stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb и stream_change_cb являются обязательными, в то время как stream_message_cb и stream_truncate_cb являются необязательными. stream_prepare_cb также является обязательной, если плагин вывода поддерживает двухфазные фиксации.

Плагин вывода также может определять функции для поддержки двухфазных фиксаций, что позволяет декодировать действия на PREPARE TRANSACTION. Обратные вызовы begin_prepare_cb, prepare_cb, commit_prepared_cb и rollback_prepared_cb являются обязательными, в то время как filter_prepare_cb является необязательным. stream_prepare_cb также обязателен, если плагин вывода также поддерживает потоковую передачу больших незавершенных транзакций.

46.6.2. Возможности #

Для декодирования, форматирования и вывода изменений, плагины вывода могут использовать большую часть нормальной инфраструктуры сервера, включая вызов функций вывода. Разрешен только чтение отношений, при условии, что обращение происходит только к отношениям, созданным командой initdb в схеме pg_catalog, или отмеченным как пользовательские каталоговые таблицы.

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

Обратите внимание, что доступ к таблицам пользовательского каталога или обычным системным каталогам в плагинах вывода должен осуществляться только через API сканирования systable_*. Доступ через API сканирования heap_* вызовет ошибку. Кроме того, запрещены любые действия, приводящие к присвоению идентификатора транзакции. Сюда входят, среди прочего, запись в таблицы, выполнение изменений DDL и вызов pg_current_xact_id().

46.6.3. Режимы вывода #

Выводные обратные вызовы плагина могут передавать данные потребителю в почти произвольных форматах. Для некоторых случаев использования, например, просмотра изменений через SQL, возвращение данных в типе данных, который может содержать произвольные данные (например, bytea), неудобно. Если выводной плагин выводит только текстовые данные в кодировке сервера, он может объявить это, установив OutputPluginOptions.output_type в OUTPUT_PLUGIN_TEXTUAL_OUTPUT вместо OUTPUT_PLUGIN_BINARY_OUTPUT в обратном вызове startup. В этом случае все данные должны быть в кодировке сервера, чтобы text мог содержать их. Это проверяется в сборках с включенными утверждениями.

46.6.4. Вызовы обратного вызова выходного плагина #

Выходной плагин получает уведомления о происходящих изменениях через различные обратные вызовы, которые он должен предоставить.

Все параллельные транзакции декодируются в порядке коммита, и только изменения, принадлежащие конкретной транзакции, декодируются между обратными вызовами begin и commit. Транзакции, явно или неявно откатившиеся, никогда не декодируются. Успешные точки сохранения объединяются в транзакцию, содержащую их, в порядке их выполнения внутри этой транзакции. Транзакция, подготовленная для двухфазного коммита с использованием PREPARE TRANSACTION, также будет декодирована, если предоставлены обратные вызовы плагина вывода, необходимые для декодирования. Возможно, что текущая подготовленная транзакция, которая декодируется, будет аннулирована параллельно с помощью команды ROLLBACK PREPARED. В этом случае логическое декодирование этой транзакции также будет прервано. Все изменения такой транзакции пропускаются после обнаружения отката и вызова обратного вызова prepare_cb. Таким образом, даже в случае параллельного отката предоставляется достаточно информации плагину вывода, чтобы он должным образом обработал команду ROLLBACK PREPARED, как только она будет декодирована.

Примечание

Только транзакции, которые уже безопасно записаны на диск, будут декодированы. Это может привести к тому, что COMMIT не будет сразу же декодирован в непосредственно следующем вызове pg_logical_slot_get_changes(), когда значение synchronous_commit установлено в off.

46.6.4.1. Запуск обратного вызова #

Опциональный обратный вызов startup_cb вызывается каждый раз, когда создается слот репликации или запрашивается поток изменений, независимо от количества готовых к отправке изменений.

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

Параметр is_init будет равен true, когда слот репликации создается, и false в противном случае. options указывает на структуру опций, которые могут быть установлены плагинами вывода:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type должен быть установлен либо в OUTPUT_PLUGIN_TEXTUAL_OUTPUT, либо в OUTPUT_PLUGIN_BINARY_OUTPUT. См. также Раздел 46.6.3. Если receive_rewrites установлено в true, плагин вывода также будет вызываться для изменений, внесенных перезаписью кучи во время некоторых операций DDL. Это интересно для плагинов, обрабатывающих репликацию DDL, но требуется особая обработка.

Коллбэк-функция запуска должна проверить наличие опций в ctx->output_plugin_options. Если выходной плагин требует наличия состояния, он может использовать ctx->output_plugin_private для его хранения.

46.6.4.2. Shutdown Callback #

Опциональный обратный вызов shutdown_cb вызывается, когда ранее активный слот репликации больше не используется и может использоваться для освобождения ресурсов, приватных для выходного плагина. Слот не обязательно удаляется, потоковая передача просто останавливается.

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

46.6.4.3. Начало транзакции обратного вызова #

Требуемый обратный вызов begin_cb вызывается каждый раз, когда началась декодировка подтвержденной транзакции. Отмененные транзакции и их содержимое никогда не декодируются.

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

Параметр txn содержит метаинформацию о транзакции, такую как временная метка, когда она была подтверждена, и ее XID.

46.6.4.4. Обратный вызов окончания транзакции #

Требуемый обратный вызов commit_cb вызывается каждый раз, когда транзакция коммитится. Обратные вызовы change_cb для всех измененных строк будут вызваны до этого, если были измененные строки.

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

46.6.4.5. Изменить обратный вызов #

Требуется обратный вызов change_cb, который вызывается для каждой отдельной модификации строки внутри транзакции, будь то INSERT, UPDATE или DELETE. Даже если исходная команда модифицировала несколько строк одновременно, обратный вызов будет вызываться отдельно для каждой строки. Обратный вызов change_cb может получать доступ к системным или пользовательским каталогам для помощи в процессе вывода деталей модификации строки. В случае декодирования подготовленной (но еще не подтвержденной) транзакции или декодирования неподтвержденной транзакции, этот обратный вызов изменения также может завершиться с ошибкой из-за одновременного отката этой же самой транзакции. В этом случае логическое декодирование этой прерванной транзакции прекращается плавно.

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

Параметры ctx и txn имеют тот же содержимое, что и для обратных вызовов begin_cb и commit_cb, но дополнительно передаются дескриптор отношения relation, указывающий на отношение, к которому принадлежит строка, и структура change, описывающая модификацию строки.

Примечание

С помощью логического декодирования можно извлекать только изменения в пользовательских таблицах, которые не являются нефиксируемыми (см. UNLOGGED) и не являются временными (см. TEMPORARY or TEMP).

46.6.4.6. Обратный вызов Truncate #

Необязательный обратный вызов truncate_cb вызывается для команды TRUNCATE.

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

Параметры аналогичны обратному вызову change_cb. Однако, поскольку действия TRUNCATE на таблицах, связанных внешними ключами, должны выполняться вместе, этот обратный вызов получает массив отношений вместо одного. См. описание оператора TRUNCATE для получения подробной информации.

46.6.4.7. Фильтр обратного вызова источника #

Опциональный обратный вызов filter_by_origin_cb вызывается для определения, являются ли данные, воспроизведенные из origin_id, интересными для выходного плагина.

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

Параметр ctx имеет тот же контент, что и для других обратных вызовов. Нет доступной информации, кроме источника. Чтобы указать, что изменения, исходящие от переданного узла, не имеют значения, верните true, что приведет к их фильтрации; в противном случае - false. Для транзакций и изменений, которые были отфильтрованы, другие обратные вызовы не будут вызываться.

Это полезно при реализации каскадных или многонаправленных решений репликации. Фильтрация по источнику позволяет предотвратить повторная репликация одних и тех же изменений в таких настройках. Хотя транзакции и изменения также содержат информацию об источнике, фильтрация через этот обратный вызов заметно более эффективна.

46.6.4.8. Общий обратный вызов сообщений #

Опциональный обратный вызов message_cb вызывается каждый раз, когда декодировано сообщение логического декодирования.

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

Параметр txn содержит метаинформацию о транзакции, такую как временная метка, когда она была подтверждена, и ее XID. Однако обратите внимание, что он может быть NULL, когда сообщение не является транзакционным и XID еще не был назначен в транзакции, которая записала сообщение. Параметр lsn содержит местоположение WAL сообщения. Параметр transactional указывает, было ли сообщение отправлено как транзакционное или нет. Аналогично обратному вызову изменений, в случае декодирования подготовленной (но еще не подтвержденной) транзакции или декодирования неподтвержденной транзакции, этот обратный вызов сообщений также может завершиться с ошибкой из-за одновременного отката этой же самой транзакции. В этом случае логическое декодирование этой прерванной транзакции прекращается плавно. Параметр prefix - это произвольный префикс с нулевым завершением, который может использоваться для идентификации интересующих сообщений для текущего плагина. И, наконец, параметр message содержит фактическое сообщение размером message_size.

Следует обратить особое внимание на то, чтобы префикс, который рассматривает выходной плагин, был уникальным. Часто хорошим выбором является использование имени расширения или самого выходного плагина.

46.6.4.9. Подготовка обратного вызова фильтра #

Необязательный обратный вызов filter_prepare_cb вызывается для определения того, должны ли данные, являющиеся частью текущей двухфазной транзакции подготовки, рассматриваться для декодирования на этапе подготовки или позже, как обычная однофазная транзакция при выполнении команды COMMIT PREPARED. Чтобы указать, что декодирование должно быть прне указано, верните true; в противном случае верните false. Когда обратный вызов не определен, предполагается false (т.е. нет фильтрации, все транзакции, использующие двухфазный коммит, также декодируются в две фазы).

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

Параметр ctx содержит те же данные, что и для других обратных вызовов. Параметры xid и gid предоставляют два разных способа идентификации транзакции. Позднее выполнение команды COMMIT PREPARED или ROLLBACK PREPARED содержит оба идентификатора, что позволяет плагину вывода выбрать, что использовать.

Коллбэк может быть вызван несколько раз за транзакцию для декодирования и должен предоставлять одинаковый статический ответ для заданной пары xid и gid каждый раз, когда он вызывается.

46.6.4.10. Начало транзакции Подготовка Обратного вызова #

Требуемый обратный вызов begin_prepare_cb вызывается, когда начало подготовленной транзакции было декодировано. Поле gid, которое является частью параметра txn, может быть использовано в этом обратном вызове для проверки, была ли уже получена этой плагином команда PREPARE, в таком случае он может либо вызвать ошибку, либо пропустить оставшиеся изменения транзакции.

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

46.6.4.11. Обратный вызов подготовки транзакции #

Требуемый обратный вызов prepare_cb вызывается каждый раз, когда транзакция, подготовленная для двухфазного коммита, была декодирована. Обратный вызов change_cb для всех измененных строк будет вызван до этого, если были изменены какие-либо строки. Поле gid, которое является частью параметра txn, может быть использовано в этом обратном вызове.

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

46.6.4.12. Обратный вызов коммита транзакции #

Требуемый обратный вызов commit_prepared_cb вызывается каждый раз, когда транзакция COMMIT PREPARED была декодирована. Поле gid, которое является частью параметра txn, может быть использовано в этом обратном вызове.

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

46.6.4.13. Обратный вызов отката транзакции, подготовленный #

Необходимый обратный вызов rollback_prepared_cb вызывается каждый раз, когда декодирована команда ROLLBACK PREPARED. Поле gid, которое является частью параметра txn, может использоваться в этом обратном вызове. Параметры prepare_end_lsn и prepare_time могут использоваться для проверки, была ли получена плагином команда PREPARE TRANSACTION, в таком случае можно выполнить откат, в противном случае можно пропустить операцию отката. Только поле gid недостаточно, потому что у удаленного узла может быть подготовленная транзакция с тем же идентификатором.

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

46.6.4.14. Обратный вызов начала потока #

Требуемый обратный вызов stream_start_cb вызывается при открытии блока потоковых изменений из выполняемой транзакции.

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

46.6.4.15. Обратный вызов остановки потока #

Требуемый stream_stop_cb callback вызывается при закрытии блока потоковых изменений из выполняемой транзакции.

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

46.6.4.16. Обратный вызов прерывания потока #

Требуемый обратный вызов stream_abort_cb вызывается для прерывания ранее переданной транзакции.

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

46.6.4.17. Обратный вызов подготовки потока #

stream_prepare_cb обратный вызов вызывается для подготовки ранее переданной транзакции в рамках двухфазного коммита. Этот обратный вызов необходим, когда плагин вывода поддерживает как потоковую передачу крупных незавершенных транзакций, так и двухфазные коммиты.

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

46.6.4.18. Обратный вызов потока коммита #

Требуемый обратный вызов stream_commit_cb вызывается для фиксации ранее переданной транзакции.

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

46.6.4.19. Обратный вызов изменения потока #

Требуемый обратный вызов stream_change_cb вызывается при отправке изменения в блоке потоковых изменений (разграниченных вызовами stream_start_cb и stream_stop_cb). Фактические изменения не отображаются, так как транзакция может быть отменена позже, и мы не декодируем изменения для отмененных транзакций.

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

46.6.4.20. Обратный вызов потока сообщений #

Необязательный обратный вызов stream_message_cb вызывается при отправке общего сообщения в блоке потоковых изменений (разграниченных вызовами stream_start_cb и stream_stop_cb). Содержимое сообщений для транзакционных сообщений не отображается, так как транзакция может быть прервана позже, и мы не декодируем изменения для прерванных транзакций.

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

46.6.4.21. Обратный вызов обрезки потока #

Необязательный обратный вызов stream_truncate_cb вызывается для команды TRUNCATE в блоке потоковых изменений (разграниченных вызовами stream_start_cb и stream_stop_cb).

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

Параметры аналогичны обратному вызову stream_change_cb. Однако, поскольку действия TRUNCATE на таблицах, связанных внешними ключами, должны выполняться вместе, этот обратный вызов получает массив отношений вместо одного. Подробности см. в описании оператора TRUNCATE.

46.6.5. Функции для создания вывода #

Для фактического вывода данных, плагины вывода могут записывать данные в буфер вывода StringInfo в ctx->out при нахождении в обратных вызовах begin_cb, commit_cb или change_cb. Перед записью в буфер вывода необходимо вызвать OutputPluginPrepareWrite(ctx, last_write), а после завершения записи в буфер необходимо вызвать OutputPluginWrite(ctx, last_write) для выполнения записи. Параметр last_write указывает, была ли данная запись последней в обратном вызове.

Следующий пример показывает, как вывести данные потребителю плагина вывода:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);