F.70. wal2json#
F.70. wal2json #
wal2json является плагином вывода для логического декодирования. Это означает, что плагин имеет доступ к кортежам, создаваемым INSERT и UPDATE. Также, старые версии строк для UPDATE/DELETE могут быть доступны в зависимости от настроенной идентичности реплики. Изменения могут быть потреблены с использованием потокового протокола (слоты логической репликации) или через специальный SQL API.
формат версии 1
создает JSON-объект для каждой транзакции. Все новые/старые кортежи доступны в JSON-объекте. Также есть опции для включения таких свойств, как временная метка транзакции, схема-квалифицированная, типы данных и идентификаторы транзакций.
формат версии 2
создает JSON
объект для каждой кортежа. Необязательный JSON объект для начала и конца
транзакции. Также существует множество опций для включения
свойств.
F.70.2. Конфигурация #
F.70.2.1. postgresql.conf #
Вам нужно настроить как минимум два параметра в postgresql.conf:
wal_level = logical # ## these parameters only need to set in versions 9.4, 9.5 and 9.6 ## default values are ok in version 10 or later # max_replication_slots = 10 max_wal_senders = 10
После изменения этих параметров требуется перезапуск.
F.70.2.2. Параметры #
include-xids
: добавляет xid к каждому набору изменений. По умолчанию false.include-timestamp
: добавляет метку времени к каждому набору изменений. По умолчанию false.include-schemas
: добавьте схему к каждому изменению. По умолчанию true.include-types
: добавлять тип к каждому изменению. По умолчанию true.include-typmod
: добавлять модификатор к типам, которые его имеют (например, varchar(20) вместо varchar). По умолчанию true.include-type-oids
: добавить OID типов. По умолчанию false.include-domain-data-type
: заменить имя домена на базовый тип данных. По умолчанию false.include-column-positions
: добавить позицию столбца (pg_attribute.attnum). По умолчанию false.include-origin
: добавить источник данных. По умолчанию false.include-not-null
: добавьте информацию not null как columnoptionals. По умолчанию false.include-default
: добавить выражение по умолчанию. По умолчанию false.include-pk
: добавляет информацию о первичном ключе как pk. Имя столбца и тип данных включены. По умолчанию false.numeric-data-types-as-string
: используйте строку для числовых типов данных. Спецификация JSON не признаетInfinity
иNaN
как допустимые числовые значения. Это может вызвать потенциальные проблемы совместимости для чисел двойной точности. По умолчанию false.pretty-print
: добавляет пробелы и отступы к структурам JSON. По умолчанию false.write-in-chunks
: записывать после каждого изменения вместо каждого набора изменений. Используется только, когдаformat-version
равен1
. По умолчанию false.include-lsn
: добавляет nextlsn к каждому набору изменений. По умолчанию false.include-transaction
: издавать записи, обозначающие начало и конец каждой транзакции. По умолчанию true.include-unchanged-toast
(устарело): Не используйте это. Это устарело.filter-origins
: исключить изменения из указанных источников. По умолчанию пусто, что означает, что никакой источник не будет отфильтрован. Это значение, разделенное запятыми.filter-tables
: исключить строки из указанных таблиц. По умолчанию пусто, что означает, что ни одна таблица не будет отфильтрована. Это значение, разделенное запятыми. Таблицы должны быть указаны с учетом схемы.*.foo
означает таблицу foo во всех схемах, аbar.*
означает все таблицы в схеме bar. Специальные символы (пробел, одинарная кавычка, запятая, точка, звездочка) должны быть экранированы обратной косой чертой. Схема и таблица чувствительны к регистру. Таблица"public"."Foo bar"
должна быть указана какpublic.Foo\ bar
.add-tables
: включать только строки из указанных таблиц. По умолчанию все таблицы из всех схем. Имеет те же правила, что иfilter-tables
.filter-msg-prefixes
: исключить сообщения, если префикс находится в списке. По умолчанию пусто, что означает, что сообщения не будут фильтроваться. Это значение, разделенное запятыми.add-msg-prefixes
: включать только сообщения, если префикс находится в списке. По умолчанию все префиксы. Это значение, разделенное запятыми.wal2json
применяетfilter-msg-prefixes
перед этим параметром.format-version
: определяет, какой формат использовать. По умолчанию 1.actions
: определяет, какие операции будут отправлены. По умолчанию это все действия (вставка, обновление, удаление и усечение). Однако, если вы используетеformat-version
1, усечение не включено (обратная совместимость).
F.70.3. Примеры #
Есть два способа получить изменения (объекты JSON) из плагина wal2json: вызов функций через SQL или pg_recvlogical.
F.70.3.1. pg_recvlogical #
Помимо вышеуказанной конфигурации, необходимо настроить
подключение для репликации, чтобы использовать pg_recvlogical. Логическое
подключение для репликации в версиях 9.4, 9.5 и 9.6 требует
ключевое слово replication
в столбце базы данных.
Начиная с версии 10, логическая репликация соответствует обычной записи
с именем базы данных или такими ключевыми словами, как all
.
Сначала добавьте правило подключения репликации в pg_hba.conf (9.4, 9.5 и 9.6):
local replication myuser trust
Если вы используете версию 10 или более позднюю:
local mydatabase myuser trust
Также установите max_wal_senders в postgresql.conf:
max_wal_senders = 1
Перезапуск необходим, если вы изменили max_wal_senders.
Вы готовы попробовать wal2json. В одном терминале:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
В другом терминале:
$ cat /tmp/example1.sql CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); BEGIN; INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table1_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table1_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; DROP TABLE table1_with_pk; DROP TABLE table1_without_pk; $ psql -At -f /tmp/example1.sql postgres CREATE TABLE CREATE TABLE BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78BFC828 3/78BFC880 DELETE 2 3/78BFC990 INSERT 0 1 UPDATE 1 COMMIT DROP TABLE DROP TABLE
Вывод в первом терминале:
{ "change": [ ] } { "change": [ ] } { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } WARNING: table "table1_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table1_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 11:58:28.988414"] } ,{ "kind": "insert", "schema": "public", "table": "table1_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 11:58:28.988414"] } ,{ "kind": "insert", "schema": "public", "table": "table1_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 11:58:28.988414"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table1_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 11:58:28.988414"] } } ,{ "kind": "delete", "schema": "public", "table": "table1_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 11:58:28.988414"] } } ,{ "kind": "insert", "schema": "public", "table": "table1_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } { "change": [ ] } { "change": [ ] }
Удаление слота в первом терминале:
Ctrl+C $ pg_recvlogical -d postgres --slot test_slot --drop-slot
F.70.3.2. SQL Функции #
$ cat /tmp/example2.sql CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table2_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table2_with_pk; DROP TABLE table2_without_pk;
Сценарий выше создает вывод ниже:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLE
Давайте повторим тот же пример с
format-version
2:
$ cat /tmp/example3.sql CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table3_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table3_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table3_with_pk; DROP TABLE table3_without_pk;
Сценарий выше создает вывод ниже:
$ psql -At -f /tmp/example3.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78CB8F30 3/78CB8F88 DELETE 2 3/78CB90B8 INSERT 0 1 UPDATE 1 COMMIT psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk" {"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"} {"action":"B"} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]} {"action":"C"} stop DROP TABLE DROP TABLE
F.70.4. Лицензия #
Copyright (c) 2013-2024, Euler Taveira de Oliveira Все права защищены.
Перераспространение и использование в исходной и бинарной формах, с изменениями или без, разрешены при соблюдении следующих условий:
Распространения исходного кода должны сохранять указанное выше уведомление об авторских правах, этот список условий и следующий отказ от ответственности.
Распространения в двоичной форме должны воспроизводить вышеуказанное уведомление об авторских правах, этот список условий и следующий отказ от ответственности в документации и/или других материалах, предоставляемых с распространением.
Ни имя Эйлера Тавейры де Оливейры, ни имена его участников не могут быть использованы для поддержки или продвижения продуктов, созданных на основе этого программного обеспечения, без предварительного письменного разрешения.
ЭТО ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ ПРЕДОСТАВЛЯЕТСЯ ПРАВООБЛАДАТЕЛЯМИ И УЧАСТНИКАМИ «КАК ЕСТЬ», И ЛЮБЫЕ ЯВНЫЕ ИЛИ ПОДРАЗУМЕВАЕМЫЕ ГАРАНТИИ, ВКЛЮЧАЯ, НО НЕ ОГРАНИЧИВАЯСЬ, ПОДРАЗУМЕВАЕМЫЕ ГАРАНТИИ КОММЕРЧЕСКОЙ ПРИГОДНОСТИ И ПРИГОДНОСТИ ДЛЯ ОПРЕДЕЛЕННОЙ ЦЕЛИ, ОТКЛОНЯЮТСЯ. НИ ПРИ КАКИХ ОБСТОЯТЕЛЬСТВАХ ПРАВООБЛАДАТЕЛЬ ИЛИ УЧАСТНИКИ НЕ НЕСУТ ОТВЕТСТВЕННОСТИ ЗА ЛЮБЫЕ ПРЯМЫЕ, КОСВЕННЫЕ, СЛУЧАЙНЫЕ, ОСОБЫЕ, ПРИМЕРНЫЕ ИЛИ ПОСЛЕДУЮЩИЕ УБЫТКИ (ВКЛЮЧАЯ, НО НЕ ОГРАНИЧИВАЯСЬ, ПРИОБРЕТЕНИЕ ТОВАРОВ ИЛИ УСЛУГ; ПОТЕРЮ ИСПОЛЬЗОВАНИЯ, ДАННЫХ ИЛИ ПРИБЫЛИ; ИЛИ ПРЕРЫВАНИЕ ДЕЯТЕЛЬНОСТИ) НЕЗАВИСИМО ОТ ПРИЧИНЫ И НА ЛЮБОЙ ТЕОРИИ ОТВЕТСТВЕННОСТИ, БУДЬ ТО В ДОГОВОРЕ, СТРОГОЙ ОТВЕТСТВЕННОСТИ ИЛИ ДЕЛИКТЕ (ВКЛЮЧАЯ НЕБРЕЖНОСТЬ ИЛИ ИНОЕ), ВОЗНИКАЮЩИЕ ЛЮБЫМ ОБРАЗОМ ИЗ ИСПОЛЬЗОВАНИЯ ЭТОГО ПРОГРАММНОГО ОБЕСПЕЧЕНИЯ, ДАЖЕ ЕСЛИ БЫЛО СООБЩЕНО О ВОЗМОЖНОСТИ ТАКОГО УЩЕРБА.