F.44. PGQ#
F.44. PGQ #
F.44.2. Описание #
PGQ - это решение для очередей от Skytools. Решение для репликации Londiste_Tutorial - это фоновый потребитель, построенный на основе PGQ, и API доступен для создания любого асинхронного обработчика, основанного на очередях.
Подсказка: Начиная с 2021 года есть предложение о дополнительной части стандарта SQL: SQL/PGQ" (Property Graph Queries). Не путайте данную информацию с этим предложением.
F.44.3. Какие проблемы решает PGQ? #
PGQ решит асинхронную пакетную обработку живых транзакций.
Это означает, что вы выполняете некоторые INSERT/DELETE/UPDATE
строк из
вашей рабочей среды, и нужно вызвать некоторые действия, но
не в момент COMMIT
, а позже. Не в слишком отдаленном будущем,
просто асинхронно: без блокировки рабочей транзакции.
Каждое приложение определенного размера будет нуждаться в отложенной обработке некоторых операций на более поздний срок, и PGQ - это универсальное высокопроизводительное решение, созданное для PostgreSQL, которое позволяет реализовать именно это: пакетную обработку. PGQ будет заботиться о асинхронном потреблении событий, управлении ошибками, поведении очереди и т. д., и он поставляется с простым SQL API для этого.
F.44.4. Установка и настройка #
Вам потребуется иметь экземпляр тикера, работающий на базе данных, где происходит генерация событий, что означает, что вам нужно предоставить файл конфигурации ticker.ini и запустить демона тикера.
F.44.4.1. Тикер #
См. Руководство по Londiste, The_ticker_daemon для введения в тему, примера конфигурации и инструкций по запуску демона.
Тикер будет генерировать тики, которые затем будут служить границами для пакетных событий. Эти пакеты генерируются по требованию, каждый раз, когда потребитель запрашивает новый пакет, и будут генерироваться таким образом, чтобы каждый пакет содержал либо события, стоимостью в ticker_max_lag секунд, либо ticker_max_count событий, в зависимости от того, что произойдет раньше.
Демон отвечает за обслуживание очередей, что достигается путем ротации событий INSERT в 3 таблицах и использования операции TRUNCATE, как только одна из таблиц больше не содержит необработанных событий.
F.44.4.2. Производство и потребление событий #
Чтобы не потерять ни одно событие, убедитесь, что есть хотя бы один зарегистрированный потребитель перед тем, как производить какое-либо событие, так как события, в которых никто не заинтересован, теряются.
Вы можете иметь столько потребителей, сколько вам нужно, в одной очереди событий, но все они будут видеть одни и те же события, а не разделять нагрузку. Это имеет смысл, если учесть, что основное применение - репликация: потребители являются репликами, и когда их больше одного, нужно, чтобы все они обрабатывали одни и те же события, чтобы все реплики имели одинаковый набор данных.
F.44.5. Генерация событий #
Документация вполне ясно говорит об этом, вы можете создавать события либо с использованием API на основе функций, либо с использованием триггеров. Последний вариант предпочтительнее, так как он обеспечивает простую проверку типов данных SQL и известный интерфейс для вставки событий, классическую INSERT INTO.
F.44.6. Написание потребителя PGQ #
Вашим лучшим вариантом будет написать код потребителя на Python, следующим лучшим вариантом - PHP. Если нужно использовать существующий код, написанный на другом языке, вам придется самостоятельно написать код поддержки демона операционной системы и цикл вокруг точек входа API PGQ.
Вся логика PGQ написана в виде расширения PostgreSQL и кода на стороне сервера, поэтому все, что вам нужно от вашего языка, это способ подключиться к вашему серверу PostgreSQL и вызвать функции.
F.44.6.1. Как использовать PGQ SQL API #
Сначала вам понадобится, чтобы ваш код потребителя мог зарегистрироваться и отменить регистрацию в существующей очереди.
Затем основная идея довольно проста: потребитель циклически вызывает pgq.next_batch() как можно быстрее, пока не получит NULL, вот тогда он должен отдохнуть.
Пожалуйста, обратите внимание, что демон тикер будет генерировать тики каждый ticker_idle_period, когда ваша система не производит события, поэтому совершенно нормально (и ожидаемо) получать пустые пакеты в периоды низкой активности, например, при отладке вашего первого потребителя. Пустые пакеты нужно потреблять как можно быстрее, только позвольте вашему пользовательскому подписчику отдохнуть, когда pgq.next_batch() возвращает NULL.
Когда pgq.next_batch() возвращает идентификатор (bigint), вы получаете возможность обработать события пакета, которые вы получаете с помощью pgq.get_batch_events(batch_id). Что делать с каждым событием должно быть специфично для кода потребителя, а не в общем интерфейсе абстракции PGQ потребления (или библиотеки, пакета, как называется ваше средство программирования).
Функция обработки событий должна иметь возможность помечать события как обработанные (OK), "неудачные" или для "повторной попытки" позже. В случае повторной попытки, событие будет введено снова в последующую партию, в зависимости от того, через какой промежуток времени нужно повторить его.
После обработки всех событий пакета, необходимо вызвать pgq.finish_batch(batch_id), а затем выполнить COMMIT вашей транзакции обработки пакета.
В псевдо-языке это дает:
do batch_id = pgq.next_batch() if batch_id is not null then BEGIN; events = pgq.get_batch_events(batch_id); // this could be a function pointer or a virtual method or a delegate, e.g. user_defined_process_event(events); pgq.finish_batch(batch_id); COMMIT; end if; while true;
F.44.6.2. Удаленное потребление #
Это ситуация, когда обработка событий происходит на другой базе данных, чем та, где события были созданы. Londiste является хорошим примером удаленного потребления.
В этом случае вам потребуется реализовать способ избежать повторной обработки пакета, когда COMMIT обработки прошел успешно, но pgq.finish_batch() (который должен быть выполнен на производящей базе данных, а не на потребляющей) завершился неудачей. Как решить проблему в трех других сценариях оставляется на усмотрение читателя.
Идея pgq_ext заключается в записи на потребительской базе данных последнего обработанного идентификатора пакета (batch_id) и выполнении обновления в рамках транзакции обработки. Данные могут быть уточнены до последнего обработанного идентификатора события (event id) в случаях, когда у вас нет простого способа откатить всю обработку пакетов, если сбой произошел на удаленном сервере.
Как справочная информация, вы можете захотеть знать, что londiste использует pgq_ext, но перемещает записи SQL (функции и таблицы) в схему londiste.
F.44.6.3. Нетранзакционная обработка #
Ваш код потребителя может отправлять почту вместо изменения состояния базы данных. В этом случае обработка не является транзакционной (вы не можете откатить пакетную обработку), и вам придется самостоятельно решать проблемы надежности, PGQ не предоставит волшебство.
F.44.6.4. Использование Python API #
F.44.6.4.1. Использование: счетчик строк для ускорения count(*) #
Вот фрагмент кода на Python для потребителя PGQ:
import pgq class RowCounter(pgq.Consumer): def process_batch(self, db, batch_id, ev_list): tbl = self.cf.get('table_name'); delta = 0 for ev in ev_list: if ev.type == 'I' and ev.extra1 == tbl: delta += 1 elif ev.type == 'D' and ev.extra1 == tbl: delta -= 1 ev.tag_done() q = 'select update_stats(%s, %s)' db.cursor().execute(q, [tbl, delta]) RowCounter('row_counter', 'db', sys.argv[1:]).start()
F.44.6.5. Использование PHP API #
Был также представлен API для PHP, позволяющий легко написать демон-потребитель PGQ на PHP. Смотрите libphp-pgq для более подробной информации.
Вот пример
#!/usr/bin/php5 <?php require( "pgq/PGQRemoteConsumer.php" ); require("conf/duration.php"); define("CONFIGURATION", "conf/duration.php"); $con_src = "dbname=foo_db port=5432 host=localhost"; $con_dst = "dbname=bar_db port=5432 host=localhost" class PGQDaemonExample extends PGQRemoteConsumer { public function config( ) { unset($Config); if( $this->log !== null ) $this->log->notice("Reloading configuration (HUP) from '%s'", CONFIGURATION); global $Config; require(CONFIGURATION); $this->loglevel = $Config["LOGLEVEL"]; $this->logfile = $Config["LOGFILE"]; $this->delay = $Config["DELAY"]; } public function process_event( &$event ) { $this->log->notice("Starting process event"); $id = $event->data["id"]; $code = $event->data["code"]; $data = $event->data["data"]; $this->log->notice("Processing event : %d ", $id); $query = sprintf( "UPDATE table SET ... WHERE ...", $code ); $this->log->debug( $query ); $result = pg_query( $this->pg_dst_con, $query ); if( $result === False ) { $this->log->error( "Unable to update : %s ", $query ); $event->retry_delay = 2 * $this->delay; return PGQ_EVENT_RETRY; } return PGQ_EVENT_OK; } } $daemon = new PGQDaemonExample( "mydaemon", "daemonq", "table", $argc, $argv, $con_src, $con_dst ); ?>